In [1]:
from pyspark.sql import SparkSession
import re
from pyspark.sql import Row

In [2]:
spark = SparkSession.builder.getOrCreate()

In [5]:
df = spark.read.load("samples/samples.json", format='json')

In [6]:
print('datatype columns')
df.printSchema()

print('first 5 observations')
df.show(5, truncate=True)

datatype columns
root
 |-- feature_1: double (nullable = true)
 |-- feature_2: string (nullable = true)
 |-- feature_3: double (nullable = true)
 |-- feature_4: double (nullable = true)
 |-- feature_5: double (nullable = true)
 |-- feature_6: long (nullable = true)
 |-- feature_7: double (nullable = true)
 |-- feature_8: double (nullable = true)
 |-- user_id: string (nullable = true)

first 5 observations
+-----------------+---------+------------------+------------------+--------------------+---------+------------------+--------------------+--------------------+
|        feature_1|feature_2|         feature_3|         feature_4|           feature_5|feature_6|         feature_7|           feature_8|             user_id|
+-----------------+---------+------------------+------------------+--------------------+---------+------------------+--------------------+--------------------+
|3.436424754118279|        C|-4.937046093832398| 33.47386765484094|   9042.470622894587|        4| 9.3224498133

In [8]:
# create sparkContext object
sc = spark.sparkContext
# read the file
rdd = sc.textFile("samples/samples.custom")
# split each line and obtain 3 columns: user_id, feature_9 and feature_10
rdd = rdd.map(lambda line: re.split("user_id=|feature_9=|feature_10=", line)[1:])
# convert rdd to dataframe
feat_9_10_df = rdd.map(lambda line: Row(user_id=line[0], feature_9=line[1], feature_10=line[2])).toDF()

In [10]:
# create sparkContext object
sc = spark.sparkContext
# read the file
rdd = sc.textFile("samples/samples.tsv")
# split each line and obtain user_id and labels
label_rdd = rdd.map(lambda line: line.split("\t"))
# convert label_rdd to df
label_df = label_rdd.map(lambda line: Row(user_name=line[0], label=line[1])).toDF()

In [82]:
print('df schema')
df.printSchema()
# print('number of cols = {0}, rows={1}'.format(len(df.columns), df.count()))
# merge df with the rest of the features df based on user_id
dataset = df.join(feat_9_10_df,['user_id'],'inner')
print('df schema after merge')
df.printSchema()
# sanity check
# print('number of cols = {0}, rows={1}'.format(len(df.columns), df.count()))

print('first 5 observations')
df.show(5, truncate=True)

df schema
root
 |-- feature_1: double (nullable = true)
 |-- feature_2: string (nullable = true)
 |-- feature_3: double (nullable = true)
 |-- feature_4: double (nullable = true)
 |-- feature_5: double (nullable = true)
 |-- feature_6: long (nullable = true)
 |-- feature_7: double (nullable = true)
 |-- feature_8: double (nullable = true)
 |-- user_id: string (nullable = true)

df schema after merge
root
 |-- user_id: string (nullable = true)
 |-- feature_1: double (nullable = true)
 |-- feature_2: string (nullable = true)
 |-- feature_3: double (nullable = true)
 |-- feature_4: double (nullable = true)
 |-- feature_5: double (nullable = true)
 |-- feature_6: long (nullable = true)
 |-- feature_7: double (nullable = true)
 |-- feature_8: double (nullable = true)
 |-- feature_10: string (nullable = true)
 |-- feature_9: string (nullable = true)

first 5 observations
+--------------------+------------------+---------+-------------------+------------------+--------------------+---------+-