### Apellidos y Nombres:

Lettere Dragosavljevich Mathias Giuseppe

### Fecha:

05-10-2023

# **Preprocesamiento de datos con Pyspark**


## Google Colab Setup

If you are going to use Google Colab instead of a Spark Cluster, you will need to run the following code to install Apache Spark.

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
#If the following links don't work, you will have to update them with the last versions of Apache Spark
!wget -q https://dlcdn.apache.org/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
!tar xf spark-3.4.1-bin-hadoop3.tgz

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

## Setup


In [4]:
# Installing required packages
!pip install pyspark
!pip install findspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=5828836cfac081be7f12a72fa4cba3fdcfbc4fcf73290f7df8d119e235e7eac8
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [5]:
import findspark
findspark.init()

In [6]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

#### Creating the spark session and context


In [7]:
# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

#### Initialize Spark session



In [8]:
spark

## Exercise 2 - Load the data and Spark dataframe


## Load the dataset into your Colab directory from your local system


In [9]:
from google.colab import files
files.upload()

Output hidden; open in https://colab.research.google.com to view.

In [10]:
dfsFlight = spark.read.csv("flights-larger.csv", header=True, inferSchema=True, nullValue= 'NA')
print(dfsFlight.printSchema())

root
 |-- mon: integer (nullable = true)
 |-- dom: integer (nullable = true)
 |-- dow: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- org: string (nullable = true)
 |-- mile: integer (nullable = true)
 |-- depart: double (nullable = true)
 |-- duration: integer (nullable = true)
 |-- delay: integer (nullable = true)

None


## Preprocesamiento




In [11]:
from pyspark.sql.functions import col, isnan, when, count, isnull, max, min, mode, lit

In [12]:
res = [count(when((col(c) == ' ')|( col(c).isNull()), c)).alias(c) for c in dfsFlight.columns]
dfsFlight.select(*res).show()
#def check_for_null_or_nan(df):
    #null_or_nan = lambda x: isnan(x) | isnull(x)
    #func = lambda x: df.filter(null_or_nan(x)).count()
   # print(*[f'{i} has {func(i)} nans/nulls' for i in df.columns if func(i)!=0],sep='\n')

+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
|  0|  0|  0|      0|     0|  0|   0|     0|       0|16711|
+---+---+---+-------+------+---+----+------+--------+-----+



In [13]:
max_value = dfsFlight.select(max('delay')).collect()[0][0]
min_value = dfsFlight.select(min('delay')).collect()[0][0]

print("Maximum Value:", max_value)
print("Minimum Value:", min_value)

Maximum Value: 1370
Minimum Value: -80


In [14]:
# Reemplazar nulos con la moda
dfsTemp = dfsFlight
modDel = dfsTemp.agg(mode('delay')).collect()[0][0]


dfsClean = dfsTemp.fillna({'delay': modDel})

In [15]:
res = [count(when((col(c) == ' ')|( col(c).isNull()), c)).alias(c) for c in dfsClean.columns]
dfsClean.select(*res).show()

+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
|  0|  0|  0|      0|     0|  0|   0|     0|       0|    0|
+---+---+---+-------+------+---+----+------+--------+-----+



### Miles a KM


In [16]:
dfsClean = dfsClean.withColumn("km", col("mile") * lit(1.60934))

dfsClean = dfsClean.drop("mile")

dfsClean.show()

+---+---+---+-------+------+---+------+--------+-----+----------+
|mon|dom|dow|carrier|flight|org|depart|duration|delay|        km|
+---+---+---+-------+------+---+------+--------+-----+----------+
| 10| 10|  1|     OO|  5836|ORD|  8.18|      51|   27| 252.66638|
|  1|  4|  1|     OO|  5866|ORD|  15.5|     102|   -7| 749.95244|
| 11| 22|  1|     OO|  6016|ORD|  7.17|     127|  -19|1187.69292|
|  2| 14|  5|     B6|   199|JFK| 21.17|     365|   60|3617.79632|
|  5| 25|  3|     WN|  1675|SJC| 12.92|      85|   22| 621.20524|
|  3| 28|  1|     B6|   377|LGA| 13.33|     182|   70|1731.64984|
|  5| 28|  6|     B6|   904|ORD|  9.58|     130|   47| 1190.9116|
|  1| 19|  2|     UA|   820|SFO| 12.75|     123|  135|1092.74186|
|  8|  5|  5|     US|  2175|LGA|  13.0|      71|  -10| 344.39876|
|  5| 27|  5|     AA|  1240|ORD| 14.42|     195|  -11|1926.37998|
|  8| 20|  6|     B6|   119|JFK| 14.67|     198|   20|1902.23988|
|  2|  3|  1|     AA|  1881|JFK| 15.92|     200|   -9| 1754.1806|
|  8| 26| 

## Indexación



In [17]:
from pyspark.ml.feature import StringIndexer

In [18]:
# “carrier_idx” y “org_idx”
indexer = StringIndexer(inputCols=['carrier', 'org'],
                        outputCols=['carrier_idx', 'org_idx']).fit(dfsClean).transform(dfsClean)
dfsClean.show()

+---+---+---+-------+------+---+------+--------+-----+----------+
|mon|dom|dow|carrier|flight|org|depart|duration|delay|        km|
+---+---+---+-------+------+---+------+--------+-----+----------+
| 10| 10|  1|     OO|  5836|ORD|  8.18|      51|   27| 252.66638|
|  1|  4|  1|     OO|  5866|ORD|  15.5|     102|   -7| 749.95244|
| 11| 22|  1|     OO|  6016|ORD|  7.17|     127|  -19|1187.69292|
|  2| 14|  5|     B6|   199|JFK| 21.17|     365|   60|3617.79632|
|  5| 25|  3|     WN|  1675|SJC| 12.92|      85|   22| 621.20524|
|  3| 28|  1|     B6|   377|LGA| 13.33|     182|   70|1731.64984|
|  5| 28|  6|     B6|   904|ORD|  9.58|     130|   47| 1190.9116|
|  1| 19|  2|     UA|   820|SFO| 12.75|     123|  135|1092.74186|
|  8|  5|  5|     US|  2175|LGA|  13.0|      71|  -10| 344.39876|
|  5| 27|  5|     AA|  1240|ORD| 14.42|     195|  -11|1926.37998|
|  8| 20|  6|     B6|   119|JFK| 14.67|     198|   20|1902.23988|
|  2|  3|  1|     AA|  1881|JFK| 15.92|     200|   -9| 1754.1806|
|  8| 26| 

In [19]:
dfsClean = indexer
dfsClean.show()

+---+---+---+-------+------+---+------+--------+-----+----------+-----------+-------+
|mon|dom|dow|carrier|flight|org|depart|duration|delay|        km|carrier_idx|org_idx|
+---+---+---+-------+------+---+------+--------+-----+----------+-----------+-------+
| 10| 10|  1|     OO|  5836|ORD|  8.18|      51|   27| 252.66638|        2.0|    0.0|
|  1|  4|  1|     OO|  5866|ORD|  15.5|     102|   -7| 749.95244|        2.0|    0.0|
| 11| 22|  1|     OO|  6016|ORD|  7.17|     127|  -19|1187.69292|        2.0|    0.0|
|  2| 14|  5|     B6|   199|JFK| 21.17|     365|   60|3617.79632|        4.0|    2.0|
|  5| 25|  3|     WN|  1675|SJC| 12.92|      85|   22| 621.20524|        3.0|    5.0|
|  3| 28|  1|     B6|   377|LGA| 13.33|     182|   70|1731.64984|        4.0|    3.0|
|  5| 28|  6|     B6|   904|ORD|  9.58|     130|   47| 1190.9116|        4.0|    0.0|
|  1| 19|  2|     UA|   820|SFO| 12.75|     123|  135|1092.74186|        0.0|    1.0|
|  8|  5|  5|     US|  2175|LGA|  13.0|      71|  -10|

## Bucketing de la variable departure time

In [20]:
from pyspark.ml.feature import Bucketizer

In [21]:
buckets = Bucketizer(splits=[0,3,6,9,12,15,18,21,24],
                     inputCol="depart",
                     outputCol="depart_bucket")

dfsClean = buckets.transform(dfsClean)
dfsClean.select('depart', 'depart_bucket').show(5)

+------+-------------+
|depart|depart_bucket|
+------+-------------+
|  8.18|          2.0|
|  15.5|          5.0|
|  7.17|          2.0|
| 21.17|          7.0|
| 12.92|          4.0|
+------+-------------+
only showing top 5 rows



In [22]:
dfsClean.groupby('depart_bucket').count().show()

+-------------+-----+
|depart_bucket|count|
+-------------+-----+
|          0.0|  206|
|          7.0|19128|
|          1.0|  705|
|          4.0|51955|
|          3.0|50866|
|          2.0|47684|
|          6.0|51940|
|          5.0|52516|
+-------------+-----+



## One-Hot Encoding y Consolidacion

In [30]:
from pyspark.ml.feature import OneHotEncoder

In [31]:
onehotDep = OneHotEncoder(inputCols=['depart_bucket'], outputCols=['depart_dummy'])
onehotOrg = OneHotEncoder(inputCols=['org_idx'], outputCols=['org_dummy'])

In [32]:
onehotDep = onehotDep.fit(dfsClean)
onehotOrg = onehotOrg.fit(dfsClean)

print("Categorias de Departure: ", onehotDep.categorySizes)
print("Categorias de Org: ", onehotOrg.categorySizes)

Categorias de Departure:  [8]
Categorias de Org:  [8]


In [33]:
dfsClean = onehotDep.transform(dfsClean)
dfsClean.select("depart", "depart_bucket", "depart_dummy").distinct().sort("depart_bucket").show()

+------+-------------+-------------+
|depart|depart_bucket| depart_dummy|
+------+-------------+-------------+
|  1.08|          0.0|(7,[0],[1.0])|
|   1.0|          0.0|(7,[0],[1.0])|
|  1.85|          0.0|(7,[0],[1.0])|
|  0.83|          0.0|(7,[0],[1.0])|
|  0.75|          0.0|(7,[0],[1.0])|
|  0.25|          0.0|(7,[0],[1.0])|
|  0.67|          0.0|(7,[0],[1.0])|
|  0.12|          0.0|(7,[0],[1.0])|
|  0.42|          0.0|(7,[0],[1.0])|
|  1.42|          0.0|(7,[0],[1.0])|
|  5.28|          1.0|(7,[1],[1.0])|
|  5.58|          1.0|(7,[1],[1.0])|
|  5.67|          1.0|(7,[1],[1.0])|
|   5.0|          1.0|(7,[1],[1.0])|
|  5.92|          1.0|(7,[1],[1.0])|
|  4.42|          1.0|(7,[1],[1.0])|
|  5.83|          1.0|(7,[1],[1.0])|
|  5.75|          1.0|(7,[1],[1.0])|
|   5.5|          1.0|(7,[1],[1.0])|
|  5.17|          1.0|(7,[1],[1.0])|
+------+-------------+-------------+
only showing top 20 rows



In [34]:
dfsClean = onehotOrg.transform(dfsClean)
dfsClean.select("org", "org_idx", "org_dummy").distinct().sort("org_idx").show()

+---+-------+-------------+
|org|org_idx|    org_dummy|
+---+-------+-------------+
|ORD|    0.0|(7,[0],[1.0])|
|SFO|    1.0|(7,[1],[1.0])|
|JFK|    2.0|(7,[2],[1.0])|
|LGA|    3.0|(7,[3],[1.0])|
|SMF|    4.0|(7,[4],[1.0])|
|SJC|    5.0|(7,[5],[1.0])|
|TUS|    6.0|(7,[6],[1.0])|
|OGG|    7.0|    (7,[],[])|
+---+-------+-------------+



## Consolidar columnas (features)

In [35]:
from pyspark.ml.feature import VectorAssembler

In [36]:
assembler = VectorAssembler(inputCols=['km', 'org_dummy', 'depart_dummy'],
                            outputCol='features')
dfsFlightClean = assembler.transform(dfsClean)

dfsFlightClean.show()

+---+---+---+-------+------+---+------+--------+-----+----------+-----------+-------+-------------+-------------+-------------+--------------------+
|mon|dom|dow|carrier|flight|org|depart|duration|delay|        km|carrier_idx|org_idx|depart_bucket| depart_dummy|    org_dummy|            features|
+---+---+---+-------+------+---+------+--------+-----+----------+-----------+-------+-------------+-------------+-------------+--------------------+
| 10| 10|  1|     OO|  5836|ORD|  8.18|      51|   27| 252.66638|        2.0|    0.0|          2.0|(7,[2],[1.0])|(7,[0],[1.0])|(15,[0,1,10],[252...|
|  1|  4|  1|     OO|  5866|ORD|  15.5|     102|   -7| 749.95244|        2.0|    0.0|          5.0|(7,[5],[1.0])|(7,[0],[1.0])|(15,[0,1,13],[749...|
| 11| 22|  1|     OO|  6016|ORD|  7.17|     127|  -19|1187.69292|        2.0|    0.0|          2.0|(7,[2],[1.0])|(7,[0],[1.0])|(15,[0,1,10],[118...|
|  2| 14|  5|     B6|   199|JFK| 21.17|     365|   60|3617.79632|        4.0|    2.0|          7.0|    (7,

## Entrenamiento y Prueba


In [37]:
flyTrain, flyTest = dfsFlightClean.randomSplit([0.8, 0.2], seed=23)

[flyTest.count(), flyTrain.count()]

[55438, 219562]

## Regresion Lineal


In [38]:
from pyspark.ml.regression import LinearRegression


In [39]:
regr = LinearRegression(labelCol="duration")
regr = regr.fit(flyTrain)

In [40]:
predictions = regr.transform(flyTest)

predictions['duration', 'prediction'].show()
#['label', 'prediction', 'probability']

+--------+------------------+
|duration|        prediction|
+--------+------------------+
|     385|365.67624085665835|
|     325|  354.174899490941|
|     135|150.13570543761247|
|     310| 317.0198966862704|
|     130| 134.0926282412561|
|     150|153.11754954734403|
|     135|129.91015691699147|
|     125| 125.2983624091534|
|     285|265.44448520718015|
|     205| 209.7538719453969|
|     104| 119.7109526632165|
|      80| 79.66898121133191|
|     250|226.70723749942647|
|     255|231.55180945411982|
|     280|263.40916944346327|
|      70| 73.04103998960883|
|     205|193.96898456371187|
|     125| 128.2340145233423|
|      70| 77.22351131387347|
|      70|  77.3284949952303|
+--------+------------------+
only showing top 20 rows



## Metricas de Evaluacion



In [41]:
from pyspark.ml.evaluation import RegressionEvaluator

### RMSE

In [42]:
RegressionEvaluator(labelCol="duration").evaluate(predictions)

10.982506059776263

### MSE

In [43]:
RegressionEvaluator(labelCol="duration", metricName= "mse").evaluate(predictions)

120.61543935302232

### R^2

In [44]:
RegressionEvaluator(labelCol="duration", metricName= "r2").evaluate(predictions)

0.9841345071648551

### Intercepts

In [45]:
regr.intercept

10.503026653062964

### Coeficientes

In [46]:
import array

In [50]:
dfsFlightClean.select("org_idx", "depart_bucket").distinct().sort("org_idx").createOrReplaceTempView("Coeficientes")
spark.sql(" select * from Coeficientes ").show()
regr.coefficients.values

+-------+-------------+
|org_idx|depart_bucket|
+-------+-------------+
|    0.0|          4.0|
|    0.0|          5.0|
|    0.0|          1.0|
|    0.0|          6.0|
|    0.0|          7.0|
|    0.0|          2.0|
|    0.0|          3.0|
|    1.0|          0.0|
|    1.0|          1.0|
|    1.0|          6.0|
|    1.0|          3.0|
|    1.0|          4.0|
|    1.0|          5.0|
|    1.0|          2.0|
|    1.0|          7.0|
|    2.0|          7.0|
|    2.0|          0.0|
|    2.0|          2.0|
|    2.0|          3.0|
|    2.0|          4.0|
+-------+-------------+
only showing top 20 rows



array([ 7.43935133e-02,  2.69497041e+01,  1.99369420e+01,  5.18747765e+01,
        4.58696418e+01,  1.50676841e+01,  1.71801267e+01,  1.74493592e+01,
       -1.46793330e+01,  9.99331306e-03,  4.03729875e+00,  6.98040737e+00,
        4.69939938e+00,  8.88187070e+00,  8.98685439e+00])

## Análisis del modelo

En cuanto a rendimiento, en todas las métricas mejoró, posee un r^2 mayor (0,08), y un RMSE menor en 0,75. Aunque es una mejora leve, es interesante considerar que este pequeño cambio en procesamiento resulto en las mejoras presentadas, demostrando la utilidad de bucketing. Vale recalcar que el split  de datos esta vez fue de 0.8 y 0.2, pero esto tiene un efecto negligible sobre el modelo como tal.

In [51]:
spark.stop()