# HW 2: read gzipped json(s) to a dataframe

In [None]:
%autosave 1

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import *
import typing as T
import cytoolz.curried as tz
from pyspark.sql.functions import col
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import RandomForestRegressor

from pyspark.ml.tuning import ParamGridBuilder
import pyspark


import os

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1670332467339_0010,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.


In [None]:
your_unique_name = "daniel_ethan_hw2" # Use this string to look for your app in the Spark UI
if your_unique_name == "noam_hw2":
    raise Exception("Please use your own name")

In [None]:
spark = SparkSession.builder.appName(your_unique_name).getOrCreate()

In [None]:
fname_test_data = "bids_20K_no_bidfloor.json"
fname = "2_2.gz"
fname_ref = "bids_12.json"

In [None]:
from pyspark.sql.types import MapType

# At least one file has duplicate key, failing the reading of dataframe when it has to infer the schema.
# So I read a known well defined file (fname_ref), learn the schema from it, and then use the schema to read
# the full file. When given explicit schema, Spark can handle duplicate keys
inferred = spark.read.json("wasbs://nc001@dacoursedatastorage.blob.core.windows.net/" + fname, multiLine=True)
#inferred = spark.read.json(fname, multiLine=True)
device = inferred.select("device")
device.printSchema()
bids = spark.read.schema(inferred.schema).json("wasbs://nc001@dacoursedatastorage.blob.core.windows.net/" + fname)


test =spark.read.json("wasbs://nc001@dacoursedatastorage.blob.core.windows.net/" +fname_test_data, multiLine=True)
ptest = spark.read.schema(test.schema).json("wasbs://nc001@dacoursedatastorage.blob.core.windows.net/" + fname_test_data)

# NOTE: the json() command can accept a list of json files. It does not work with *.gz files 





root
 |-- device: struct (nullable = true)
 |    |-- carrier: string (nullable = true)
 |    |-- connectiontype: long (nullable = true)
 |    |-- devicetype: long (nullable = true)
 |    |-- dnt: long (nullable = true)
 |    |-- dpidmd5: string (nullable = true)
 |    |-- dpidsha1: string (nullable = true)
 |    |-- geo: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- lat: double (nullable = true)
 |    |    |-- lon: double (nullable = true)
 |    |    |-- region: string (nullable = true)
 |    |    |-- type: long (nullable = true)
 |    |    |-- zip: string (nullable = true)
 |    |-- ifa: string (nullable = true)
 |    |-- ip: string (nullable = true)
 |    |-- language: string (nullable = true)
 |    |-- make: string (nullable = true)
 |    |-- model: string (nullable = true)
 |    |-- os: string (nullable = true)
 |    |-- osv: string (nullable = true)
 |    |-- ua: string (nullable = true)

# Question 1 - Flattening the data

In [None]:
def schema_to_columns(schema: pyspark.sql.types.StructType) -> T.List[T.List[str]]:
    """
    Produce a flat list of column specs from a possibly nested DataFrame schema
    """

    columns = list()

    def helper(schm: pyspark.sql.types.StructType, prefix: list = None):

        if prefix is None:
            prefix = list()

        for item in schm.fields:
            if isinstance(item.dataType, pyspark.sql.types.StructType):
                helper(item.dataType, prefix + [item.name])
            else:
                columns.append(prefix + [item.name])

    helper(schema)

    return columns


In [None]:
def flatten_frame(frame: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:

    aliased_columns = list()

    for col_spec in schema_to_columns(frame.schema):
        c = tz.get_in(col_spec, frame)
        if len(col_spec) == 1:
            aliased_columns.append(c)
        else:
            aliased_columns.append(c.alias(':'.join(col_spec)))

    return frame.select(aliased_columns)

In [None]:
flatten_data = flatten_frame(device)

In [None]:
flatten_data.printSchema()

root
 |-- device:carrier: string (nullable = true)
 |-- device:connectiontype: long (nullable = true)
 |-- device:devicetype: long (nullable = true)
 |-- device:dnt: long (nullable = true)
 |-- device:dpidmd5: string (nullable = true)
 |-- device:dpidsha1: string (nullable = true)
 |-- device:geo:city: string (nullable = true)
 |-- device:geo:country: string (nullable = true)
 |-- device:geo:lat: double (nullable = true)
 |-- device:geo:lon: double (nullable = true)
 |-- device:geo:region: string (nullable = true)
 |-- device:geo:type: long (nullable = true)
 |-- device:geo:zip: string (nullable = true)
 |-- device:ifa: string (nullable = true)
 |-- device:ip: string (nullable = true)
 |-- device:language: string (nullable = true)
 |-- device:make: string (nullable = true)
 |-- device:model: string (nullable = true)
 |-- device:os: string (nullable = true)
 |-- device:osv: string (nullable = true)
 |-- device:ua: string (nullable = true)

# Question 2 - Analyzing the data

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import pylab as pl
import math

In [None]:
df2 = bids.select(bids["imp.bidfloor"][0].alias("bidfloor"), bids["location.countrycode"].alias("country"))
df2.show(10)

+-------------------+-------+
|           bidfloor|country|
+-------------------+-------+
|              0.564|    MEX|
|               1.08|    SAU|
|            1.13782|    IRN|
|            0.18828|    USA|
|0.09813432835820896|    BRA|
|          1.2737148|    USA|
|0.18074880000000002|    USA|
| 0.8709614159999999|    USA|
|             1.4808|    KWT|
|             1.3896|    KWT|
+-------------------+-------+
only showing top 10 rows

In [None]:
import numpy as np
x = np.array(df2.select('bidfloor').collect())

In [None]:
vals = []
for val in x:
    if val[0] != None:
        vals.append(val[0])
vals = np.array(vals)

In [None]:
counts, bins = np.histogram(vals)
plt.hist(bins[:-1], bins, weights=counts,density=True,log=True,color='b')
plt.title("Bidfloor Frequency")
plt.xlabel("Probabilities")
plt.ylabel("Bidfloor")
plt.show()

HTML(value=u'<img src="data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAkIAAAHFCAYAAAAe+pb9AAAAOXRFWHRTb2Z0d2FyZ…

In [None]:
plt.boxplot(vals)
plt.show()

HTML(value=u'<img src="data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAigAAAGdCAYAAAA44ojeAAAAOXRFWHRTb2Z0d2FyZ…

In [None]:
country_frequency=df2.groupBy("country").count().filter("`count` >= 10").sort(col("count").desc())

In [None]:
most_commun_country = country_frequency.select('country').collect()

In [None]:
most_country_1=most_commun_country[0][0]
most_country_2=most_commun_country[1][0]
most_country_3=most_commun_country[2][0]
bids_floors_for_3cc= []

In [None]:
for i in range(3):
    bids_floors_for_3cc.append((df2.select(df2['bidfloor'].alias('bidfloor for '+str(most_commun_country[i][0]))).where(col('country')==most_commun_country[i][0])))

In [None]:
#fig,ax = plt.subplots(1,3)
for i, bd in enumerate(bids_floors_for_3cc):
    x =np.array(bd.collect())
    vals = []
    for val in x:
        if val[0] != None:
            vals.append(val[0])
    vals = np.array(vals)
    counts, bins = np.histogram(vals)
    plt.hist(bins[:-1], bins, weights=counts,density=True,log=True)
    plt.title("Bidfloor Frequency"+str(bd.columns[0]))

    #plt.set_subtitle(+str(bd.columns[0]))
    plt.ylabel("Probabilities")
    plt.xlabel("bidfloor")
    plt.show()


# Question 3 – Feature selection

Features we selected from the dataset : 

* bidfloor : double

* country code : string 

    The bidfloor is influenced by the country as we see in the previous question

* Device-geo :    
  * latitude : double

  * longitude : double
    
    The bidfloor price may be influenced by his  and his geographical position.

* Imp :
  * h : long

  * pos : long

  * w : long

  * position : long

    They represent the format and position of a publicity on the screen, we may suppose that in function of the quality and position of the add, the bidfloor will change.

* connection type : long
    
    The bidfloor price may be influenced by his connection type.
  

* timestamp : string
   
    For each bid the timestamp is unique, so it's usefull our predictions.

# Question 4 - Creating the model

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import DecisionTreeRegressor

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.sql.functions import to_timestamp

def pre_process(data, indexer=False, a=True):
    
    if indexer == False:
        data = data.select(data["location.countrycode"].alias("country"), 
                       data["device.geo.lat"].alias("latitude"),
                       data["device.geo.lon"].alias("longitude"),
                       data["imp.banner.h"].alias("h"),
                       data["imp.banner.pos"].alias("pos"),
                       data["imp.banner.w"].alias("w"),
                       data["device.connectionType"].alias("ConexionType"),
                       data["timestamp"].alias('timestamp'),
                       data["imp.position"].alias('imp_pos'),
                       data["imp.bidfloor"].alias('bidfloor'))

        data = data.select(F.col('bidfloor')[0].alias('bidfloor'),
                                 F.col("h")[0].alias('h'),
                                 F.col("pos")[0].alias('pos'),
                                 F.col("w")[0].alias('w'),
                                F.col('imp_pos')[0].alias('imp_pos'),
                                 'latitude',
                                 'longitude',
                                'ConexionType',
                                'timestamp',
                            
                                'country'
                          ).select('*')
    else:
        data = data.select(data["location.countrycode"].alias("country"), 
                       data["device.geo.lat"].alias("latitude"),
                       data["device.geo.lon"].alias("longitude"),
                       data["imp.banner.h"].alias("h"),
                       data["imp.banner.pos"].alias("pos"),
                       data["imp.banner.w"].alias("w"),
                       data["device.connectionType"].alias("ConexionType"),
                       data["timestamp"].alias('timestamp'),
                       data["imp.position"].alias('imp_pos'))

        data = data.select(F.col("h")[0].alias('h'),
                             F.col("pos")[0].alias('pos'),
                             F.col("w")[0].alias('w'),
                             F.col('imp_pos')[0].alias('imp_pos'),
                             'latitude',
                             'longitude',
                            'ConexionType',
                            'timestamp',
                            'country'
                          ).select('*')
        
    if indexer == False:
        data = data.na.drop()
    data = data.withColumn("datetype_timestamp",to_timestamp(col("timestamp")))
    data = data.withColumn("epoch_seconds", data.datetype_timestamp.cast("long")) 
    data=data.drop('timestamp')
    data =data.drop('datetype_timestamp')
    if indexer == False:

        indexer_country = StringIndexer(inputCol='country', outputCol='country_id').fit(data)
        indexed_df1 = indexer_country.transform(data)
        data  = indexed_df1.drop('country')

        """indexer_ip = StringIndexer(inputCol='ip', outputCol='ip_id').fit(data)
        indexed_df2 = indexer_ip.transform(data)
        data  = indexed_df2.drop('ip')"""
    else:
        indexer_country = indexer
        indexer_country.setHandleInvalid("keep")
        indexed_df1 = indexer_country.transform(data)
        data  = indexed_df1.drop('country')
        
        """indexer_ip = indexer[1]
        indexer_ip.setHandleInvalid("keep")

        indexed_df2 = indexer_ip.transform(data)
        data  = indexed_df2.drop('ip')"""
        
    return data,indexer_country#, indexer_ip)

In [None]:
dataset, indexers = pre_process(bids)
test_preprocess, i = pre_process(ptest,indexers)

In [None]:
features = dataset.drop('bidfloor')
assembler = VectorAssembler(inputCols=features.columns, outputCol="features")
train_set = assembler.transform(dataset).select("features", "bidfloor")
test_set = assembler.setHandleInvalid("keep").transform(test_preprocess).select('features')

In [None]:
featureIndexer = VectorIndexer(inputCol='features', outputCol="indexedFeatures", maxCategories=121,handleInvalid='keep').fit(train_set)

In [None]:
rdg = RandomForestRegressor(featuresCol="indexedFeatures",labelCol='bidfloor',maxBins=121)
pipeline = Pipeline(stages=[featureIndexer, rdg])
#test_set =featureIndexer.transform(test_set)

model = pipeline.fit(train_set)

In [None]:
prediction =model.transform(test_set)

In [None]:
pred = predictions.withColumn("y_pred", col('prediction'))

In [None]:
final_pred = pred.select("y_pred")

In [None]:
final_pred.write.option("header",True).csv("q4_931202543_345123624")

# Question 5 – Empirical Experiments

In [None]:
from pyspark.ml.stat import Correlation

assembler = VectorAssembler(inputCols=dataset.columns, outputCol="corr_features")
df_vector = assembler.transform(dataset).select("corr_features")
corr_matrix = Correlation.corr(df_vector, "corr_features")

In [None]:
matrix = corr_matrix.collect()[0]["pearson({})".format("corr_features")].values
matrix = np.reshape(matrix, (-1, 10))
print(matrix)

[[ 1.00000000e+00  1.99727632e-01  3.45655088e-01  3.58814500e-02
   3.45655088e-01  8.34969205e-02  1.23017507e-01  9.75055668e-02
   1.99682408e-03  3.33544346e-02]
 [ 1.99727632e-01  1.00000000e+00  3.16253372e-01  3.95564061e-03
   3.16253372e-01 -6.76726572e-02  1.00347141e-01 -5.44731025e-03
  -1.13770519e-02  1.51907584e-01]
 [ 3.45655088e-01  3.16253372e-01  1.00000000e+00  5.36801259e-02
   1.00000000e+00 -4.46761479e-03 -8.42150019e-03  5.54441344e-02
  -1.43272155e-02  6.41060232e-02]
 [ 3.58814500e-02  3.95564061e-03  5.36801259e-02  1.00000000e+00
   5.36801259e-02  8.85154269e-04 -3.07608774e-02 -3.32624782e-02
   8.83941249e-04 -1.86380547e-02]
 [ 3.45655088e-01  3.16253372e-01  1.00000000e+00  5.36801259e-02
   1.00000000e+00 -4.46761479e-03 -8.42150019e-03  5.54441344e-02
  -1.43272155e-02  6.41060232e-02]
 [ 8.34969205e-02 -6.76726572e-02 -4.46761479e-03  8.85154269e-04
  -4.46761479e-03  1.00000000e+00 -1.80342086e-02  2.52663725e-01
   3.85026316e-03 -1.66672601e-01

In [None]:
#correlation matrix
res_mat = []

for i in matrix:
  ind = np.argpartition(i, -10)
  print(ind[np.argsort(i[ind])])
  res_mat.append(ind[np.argsort(i[ind])])

[8 9 3 5 7 6 1 2 4 0]
[5 8 7 3 6 9 0 2 4 1]
[8 6 5 3 7 9 1 0 2 4]
[7 6 9 8 5 1 0 2 4 3]
[8 6 5 3 7 9 1 0 2 4]
[9 1 6 2 4 3 8 0 7 5]
[3 5 2 4 8 1 0 9 7 6]
[3 1 8 2 4 0 9 5 6 7]
[2 4 1 9 3 0 5 7 6 8]
[5 3 8 0 2 4 7 1 6 9]

In [None]:
#We get the average position of each feature in the matrix
final = {}
for i in range(10):
  count = 9
  for j in range(10):
    final[res_mat[i][j]] = final.get(res_mat[i][j], 0) + count
    count -= 1
for i in range(10):
  print(i, ":", final[i]/10)    

0 : 2.9
1 : 4.2
2 : 4.1
3 : 6.0
4 : 3.1
5 : 5.6
6 : 4.3
7 : 3.8
8 : 6.3
9 : 4.7

In [None]:
dataset.select(dataset[4], dataset[7]).show()

+-------+------------+
|imp_pos|ConexionType|
+-------+------------+
|      0|           2|
|      0|           6|
|      1|           3|
|      0|           0|
|      0|           2|
|      0|           3|
|      0|           3|
|      7|           3|
|      0|           3|
|      1|           2|
|      0|           3|
|      1|           6|
|      0|           2|
|      0|           2|
|      0|           2|
|      0|           6|
|      0|           2|
|      1|           2|
|      0|           3|
|      1|           6|
+-------+------------+
only showing top 20 rows

We choose to use the confusion matrix method.

We can easily see that the features 4 and 7 are the most significants.
 
The feature 4 corresponds to the imp position and the feature 7 corresponds to the connection type.