In [1]:
#!pip install pyspark

In [2]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('Practise').getOrCreate()
spark

In [4]:
df = spark.read.csv('train.csv', header=True, inferSchema=True)

In [5]:
n,p = (df.count(), len(df.columns))
print(n,p)
df.dtypes

25000 24


[('id', 'string'),
 ('AP', 'string'),
 ('creation_date_answer', 'timestamp'),
 ('situation', 'int'),
 ('ctc', 'string'),
 ('location', 'int'),
 ('gc_id', 'int'),
 ('gc_label', 'string'),
 ('creation_date_global', 'timestamp'),
 ('id_group', 'string'),
 ('id_group_2', 'string'),
 ('favorite_fruit', 'string'),
 ('fruit_situation_id', 'int'),
 ('fruit_situation_label', 'string'),
 ('fruits_or_vegetables', 'string'),
 ('number_of_fruit', 'int'),
 ('id_group_3', 'string'),
 ('creation_date_request', 'timestamp'),
 ('hobby', 'string'),
 ('id_group_4', 'string'),
 ('ville', 'string'),
 ('green_vegetables', 'string'),
 ('vegetable_type', 'string'),
 ('target', 'int')]

In [6]:
# Find count for empty, None, Null, Nan with string literals.
from pyspark.sql.functions import col,isnan,when,count
from pyspark.sql import functions as F

print("Suppression des colonnes NA")

to_delete=[]

nan_count = df.select(*[(
        F.count(F.when((F.isnan(c) | F.col(c).isNull()), c)) if t not in ("timestamp", "date")
        else F.count(F.when(F.col(c).isNull(), c))
    ).alias(c)
    for c, t in df.dtypes if c in df.columns
])

for l in df.columns:
    count_NA = nan_count.collect()[0][l] 
    if ( count_NA > 0 ):
        print(l+ ":  " + str(count_NA))
        to_delete.append(l)
        df = df.drop(l)

#df.drop(label(to_delete))

Suppression des colonnes NA
ctc:  15380
fruit_situation_label:  1728
fruits_or_vegetables:  17341
ville:  16912
vegetable_type:  24586


In [7]:
print("Suppression des colonnes non exploitables")

df = df.drop("id","id_group","id_group_2","id_group_3","id_group_4")

Suppression des colonnes non exploitables


In [8]:
print("Colonnes contenant même info: gc_id & gc_label")
#df.groupBy("gc_id").count().sort('count').show()
#df.groupBy("gc_label").count().sort('count').show()
print("On en supprime une des deux: gc_label")
df=df.drop('gc_label');

Colonnes contenant même info: gc_id & gc_label
On en supprime une des deux: gc_label


In [9]:
from pyspark.sql.functions import year, month, dayofmonth, hour

df = df.select(*df.columns,year(df.creation_date_answer).alias('year_creation_date_answer'), 
                month(df.creation_date_answer).alias('month_creation_date_answer'),
                dayofmonth(df.creation_date_answer).alias('day_creation_date_answer'), 
                hour(df.creation_date_answer).alias('hour_creation_date_answer'),
                year(df.creation_date_global).alias('year_creation_date_global'), 
                month(df.creation_date_global).alias('month_creation_date_global'),
                dayofmonth(df.creation_date_global).alias('day_creation_date_global'), 
                hour(df.creation_date_global).alias('hour_creation_date_global'),
                year(df.creation_date_request).alias('year_creation_date_request'), 
                month(df.creation_date_request).alias('month_creation_date_request'),
                dayofmonth(df.creation_date_request).alias('day_creation_date_request'), 
                hour(df.creation_date_request).alias('hour_creation_date_request'),
               )

df=df.drop("creation_date_answer","creation_date_global","creation_date_request");

#df.select("year_creation_date_answer","month_creation_date_answer","day_creation_date_answer","hour_creation_date_answer").show()

In [10]:
for label in df.columns:
    count_occ = df.select(label).distinct().count()
    print(label+": "+str(count_occ))
    if(count_occ == 1) : df=df.drop(label) 

AP: 2
situation: 8
location: 99
gc_id: 12
favorite_fruit: 2
fruit_situation_id: 22
number_of_fruit: 9
hobby: 3
green_vegetables: 2
target: 4
year_creation_date_answer: 1
month_creation_date_answer: 3
day_creation_date_answer: 31
hour_creation_date_answer: 24
year_creation_date_global: 9
month_creation_date_global: 12
day_creation_date_global: 31
hour_creation_date_global: 24
year_creation_date_request: 1
month_creation_date_request: 3
day_creation_date_request: 31
hour_creation_date_request: 24


In [11]:
n,p = (df.count(), len(df.columns))
print(n,p)
df.dtypes

25000 20


[('AP', 'string'),
 ('situation', 'int'),
 ('location', 'int'),
 ('gc_id', 'int'),
 ('favorite_fruit', 'string'),
 ('fruit_situation_id', 'int'),
 ('number_of_fruit', 'int'),
 ('hobby', 'string'),
 ('green_vegetables', 'string'),
 ('target', 'int'),
 ('month_creation_date_answer', 'int'),
 ('day_creation_date_answer', 'int'),
 ('hour_creation_date_answer', 'int'),
 ('year_creation_date_global', 'int'),
 ('month_creation_date_global', 'int'),
 ('day_creation_date_global', 'int'),
 ('hour_creation_date_global', 'int'),
 ('month_creation_date_request', 'int'),
 ('day_creation_date_request', 'int'),
 ('hour_creation_date_request', 'int')]

In [12]:
#ONE HOT ENCODING:

str_lab=[]

for lab in df.dtypes:
    if(lab[1]=="string"): str_lab.append(lab[0])
        
for l in str_lab:
    col = df.select(l).distinct().collect()
    col = [item for sublist in col for item in sublist]
    print(col)
    for i in range(len(col)-1):
        df=df.withColumn(l+"_"+str(i), (df[l]==col[i]).cast("int"))
    df=df.drop(l)

['f', 't']
['clementine', 'poire']
['noball', 'football', 'volleyball']
['f', 't']


In [13]:
#ORDINAL ENCODING

'''
from pyspark.ml.feature import StringIndexer

str_lab = []
str_lab_idx = []
for lab in df.dtypes:
    if(lab[1]=="string"): 
        str_lab.append(lab[0])
        str_lab_idx.append(lab[0]+"_idx")                

print(str_lab)
        
indexer = StringIndexer(inputCols = str_lab, outputCols = str_lab_idx)

df = indexer.fit(df).transform(df)

for s in str_lab: df = df.drop(s)
'''

'\nfrom pyspark.ml.feature import StringIndexer\n\nstr_lab = []\nstr_lab_idx = []\nfor lab in df.dtypes:\n    if(lab[1]=="string"): \n        str_lab.append(lab[0])\n        str_lab_idx.append(lab[0]+"_idx")                \n\nprint(str_lab)\n        \nindexer = StringIndexer(inputCols = str_lab, outputCols = str_lab_idx)\n\ndf = indexer.fit(df).transform(df)\n\nfor s in str_lab: df = df.drop(s)\n'

In [14]:
n,p = (df.count(), len(df.columns))
print(n,p)
df.printSchema()

25000 21
root
 |-- situation: integer (nullable = true)
 |-- location: integer (nullable = true)
 |-- gc_id: integer (nullable = true)
 |-- fruit_situation_id: integer (nullable = true)
 |-- number_of_fruit: integer (nullable = true)
 |-- target: integer (nullable = true)
 |-- month_creation_date_answer: integer (nullable = true)
 |-- day_creation_date_answer: integer (nullable = true)
 |-- hour_creation_date_answer: integer (nullable = true)
 |-- year_creation_date_global: integer (nullable = true)
 |-- month_creation_date_global: integer (nullable = true)
 |-- day_creation_date_global: integer (nullable = true)
 |-- hour_creation_date_global: integer (nullable = true)
 |-- month_creation_date_request: integer (nullable = true)
 |-- day_creation_date_request: integer (nullable = true)
 |-- hour_creation_date_request: integer (nullable = true)
 |-- AP_0: integer (nullable = true)
 |-- favorite_fruit_0: integer (nullable = true)
 |-- hobby_0: integer (nullable = true)
 |-- hobby_1: inte

In [15]:
df.toPandas().to_csv("train_clean_pyspark.csv", header=True, index=True)

In [16]:
from pyspark.ml.feature import VectorAssembler

featureAssembler = VectorAssembler(inputCols=df.select([c for c in df.columns if c not in {'target'}]).columns, outputCol = "X")
data = featureAssembler.transform(df)
data = data.select("X","target")
data = data.withColumnRenamed("target", "y")
data.columns
data.show()

+--------------------+---+
|                   X|  y|
+--------------------+---+
|[-1.0,52.0,70.0,1...|  0|
|[-1.0,78.0,10.0,1...|  1|
|[-1.0,70.0,10.0,2...|  0|
|[-1.0,84.0,10.0,1...|  1|
|[-1.0,29.0,20.0,1...|  1|
|[-1.0,32.0,10.0,2...|  0|
|[-1.0,96.0,30.0,2...|  0|
|[-1.0,43.0,10.0,1...|  0|
|[-1.0,50.0,10.0,1...|  1|
|[-1.0,47.0,10.0,1...|  0|
|[-1.0,3.0,10.0,10...|  1|
|[-1.0,60.0,10.0,2...|  0|
|[-1.0,70.0,40.0,2...|  0|
|[-1.0,68.0,10.0,2...|  0|
|[-1.0,95.0,100.0,...|  3|
|[-1.0,95.0,20.0,1...|  0|
|[-1.0,8.0,70.0,10...|  1|
|[-1.0,70.0,10.0,2...|  1|
|[-1.0,78.0,20.0,2...|  1|
|[-1.0,34.0,30.0,2...|  0|
+--------------------+---+
only showing top 20 rows



In [17]:
data.groupBy("y").count().sort('y').show()

+---+-----+
|  y|count|
+---+-----+
|  0|12077|
|  1| 8816|
|  2| 3874|
|  3|  233|
+---+-----+



In [18]:
from pyspark.ml.regression import LinearRegression

train_data, test_data = data.randomSplit([0.75,0.25])
clf_reg = LinearRegression(featuresCol="X", labelCol="y").fit(train_data)
#...