In [148]:
from pyspark.sql.functions import lit, col, udf, avg
from pyspark.sql.types import IntegerType, FloatType

In [7]:
inputDF = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("datathon_tadata.csv")

In [10]:
inputDF.createOrReplaceTempView("input_data")

In [36]:
inputDF.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- day: timestamp (nullable = true)
 |-- gender: integer (nullable = true)
 |-- p_sessionActivity: integer (nullable = true)
 |-- p_AddToCart: integer (nullable = true)
 |-- p_trafficChannel: string (nullable = true)
 |-- p_sessionDuration: integer (nullable = true)
 |-- p_pageViews: integer (nullable = true)
 |-- daysToCheckin: string (nullable = true)
 |-- osType: integer (nullable = true)
 |-- osTypeName: string (nullable = true)
 |-- daysFromPreviousVisit: integer (nullable = true)
 |-- p_TotalPrice: string (nullable = true)
 |-- isExclusiveMember: integer (nullable = true)
 |-- loggedIn: integer (nullable = true)
 |-- p_MapInteraction: integer (nullable = true)
 |-- BookingPurchase: integer (nullable = true)



inputDF.printSchema()

In [142]:
selectDF = inputDF.select("user_id", "gender", "p_sessionActivity", "p_AddToCart", "p_trafficChannel", "p_sessionDuration", "p_pageViews", "daysToCheckin", "osType", "daysFromPreviousVisit", "p_TotalPrice", "isExclusiveMember", "loggedIn", "p_MapInteraction", "BookingPurchase").dropna()
selectDF.printSchema()
# inputDF.filter(inputDF["daysToCheckin"] != "NA").count()
# inputDF.select("user_id").distinct().count()

root
 |-- user_id: integer (nullable = true)
 |-- gender: integer (nullable = true)
 |-- p_sessionActivity: integer (nullable = true)
 |-- p_AddToCart: integer (nullable = true)
 |-- p_trafficChannel: string (nullable = true)
 |-- p_sessionDuration: integer (nullable = true)
 |-- p_pageViews: integer (nullable = true)
 |-- daysToCheckin: string (nullable = true)
 |-- osType: integer (nullable = true)
 |-- daysFromPreviousVisit: integer (nullable = true)
 |-- p_TotalPrice: string (nullable = true)
 |-- isExclusiveMember: integer (nullable = true)
 |-- loggedIn: integer (nullable = true)
 |-- p_MapInteraction: integer (nullable = true)
 |-- BookingPurchase: integer (nullable = true)



In [149]:
avg_price = float(selectDF.select(avg("p_TotalPrice")).take(1)[0][0])
avg_checkin_days = float(selectDF.select(avg("daysToCheckin")).take(1)[0][0])

In [150]:
print(avg_price, avg_checkin_days)

# UDF to filter and replace value
def filterNA(cell_val, check_val, replace_val):
    print(cell_val)
    if (cell_val == check_val):
        return replace_val
    else:
        return float(cell_val)

filter_na_df = udf(filterNA, FloatType())
cleanedDF = selectDF \
.withColumn("cleaned_daysToCheckin", filter_na_df("daysToCheckin", lit("NA"), lit(avg_checkin_days))) \
.withColumn("cleaned_totalPrice", filter_na_df("p_TotalPrice", lit("NA"), lit(avg_price))) \
.drop("daysToCheckin", "p_TotalPrice")

1323.226778024252 68.29791432633138


In [151]:
cleanedDF.count()

1000000

In [152]:
inputDF.select(col("daysToCheckin")).distinct().show(inputDF.count())

+-------------+
|daysToCheckin|
+-------------+
|          296|
|          125|
|            7|
|           51|
|          124|
|          447|
|          307|
|          169|
|          205|
|          544|
|          272|
|           15|
|           54|
|          232|
|          234|
|          282|
|          383|
|          155|
|          154|
|          132|
|          317|
|          200|
|          388|
|          495|
|           11|
|          101|
|          279|
|          415|
|          433|
|          138|
|          323|
|          351|
|          361|
|          387|
|           29|
|           69|
|          309|
|           42|
|          112|
|           73|
|           87|
|          468|
|           64|
|          308|
|          348|
|          356|
|            3|
|           30|
|          113|
|          432|
|           34|
|          133|
|          287|
|          365|
|          389|
|          162|
|           59|
|          139|
|          146|
|       

In [158]:
from pyspark.ml.feature import StringIndexer

cleanedDF.select("p_trafficChannel").show()
indexer = StringIndexer(inputCol="p_trafficChannel", outputCol="trafficChannelIndex")
indexedDF = indexer.fit(cleanedDF).transform(cleanedDF).drop("p_trafficChannel")
indexedDF.show()

+----------------+
|p_trafficChannel|
+----------------+
|               O|
|               O|
|               O|
|               O|
|               O|
|               A|
|               A|
|               O|
|               H|
|               O|
|               O|
|               O|
|               O|
|               O|
|               O|
|               H|
|               A|
|               O|
|               O|
|               A|
+----------------+
only showing top 20 rows

+-------+------+-----------------+-----------+-----------------+-----------+------+---------------------+-----------------+--------+----------------+---------------+---------------------+------------------+-------------------+
|user_id|gender|p_sessionActivity|p_AddToCart|p_sessionDuration|p_pageViews|osType|daysFromPreviousVisit|isExclusiveMember|loggedIn|p_MapInteraction|BookingPurchase|cleaned_daysToCheckin|cleaned_totalPrice|trafficChannelIndex|
+-------+------+-----------------+-----------+-----------------+

In [159]:
indexedDF.count()
indexedDF.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- gender: integer (nullable = true)
 |-- p_sessionActivity: integer (nullable = true)
 |-- p_AddToCart: integer (nullable = true)
 |-- p_sessionDuration: integer (nullable = true)
 |-- p_pageViews: integer (nullable = true)
 |-- osType: integer (nullable = true)
 |-- daysFromPreviousVisit: integer (nullable = true)
 |-- isExclusiveMember: integer (nullable = true)
 |-- loggedIn: integer (nullable = true)
 |-- p_MapInteraction: integer (nullable = true)
 |-- BookingPurchase: integer (nullable = true)
 |-- cleaned_daysToCheckin: float (nullable = true)
 |-- cleaned_totalPrice: float (nullable = true)
 |-- trafficChannelIndex: double (nullable = true)



In [161]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

encoder1 = OneHotEncoder(inputCol="trafficChannelIndex", outputCol="trafficChannelVec")
encodedDF1 = encoder1.transform(indexedDF)
encoder2 = OneHotEncoder(inputCol="osType", outputCol="osTypeVec")
encodedDF2 = encoder2.transform(encodedDF1)
# encodedDF2.show()

In [None]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["gender", "p_sessionActivity", "p_AddToCart", "p_sessionDuration", "p_pageViews", "osType"],
    outputCol="features")

output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)