# Installation

In [None]:
!pip install pyspark

# Import SparkSession
from pyspark.sql import SparkSession

# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Check Spark Session Information
spark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=0dec03c7c7b10f51e4e26412a5d4fed91db317d9e7fe29c1b5b4c430ea3ddabc
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql.types import *

schema = StructType([
StructField("id", LongType(), True),
StructField("name", StringType(), True),
StructField("age", LongType(), True),
StructField("eyeColor", StringType(), True)
])

# Generate comma delimited data
swimmers = spark.createDataFrame([
(123, 'Katie', 19, 'brown'),
(234, 'Michael', 22, 'green'),
(345, 'Simone', 23, 'blue')
],schema)


In [None]:
# Creates a temporary view using the DataFrame
swimmers.createOrReplaceTempView("swimmers")


In [None]:
# Get the id, age where age = 22
swimmers.select("id", "age").filter("age = 22").show()
# Another way to write the above query is below
swimmers.select(swimmers.id, swimmers.age).filter(swimmers.age
== 22).show()


+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [None]:
# Get the name, eyeColor where eyeColor like 'b%'
swimmers.select("name", "eyeColor").filter("eyeColor like 'b%'").show()


+------+--------+
|  name|eyeColor|
+------+--------+
| Katie|   brown|
|Simone|    blue|
+------+--------+



# Some data

In [None]:
dirty_data = spark.createDataFrame([
          (1,'Porsche','Boxster S','Turbo',2.5,4,22,None)
        , (2,'Aston Martin','Vanquish','Aspirated',6.0,12,16,None)
        , (3,'Porsche','911 Carrera 4S Cabriolet','Turbo',3.0,6,24,None)
        , (3,'General Motors','SPARK ACTIV','Aspirated',1.4,None,32,None)
        , (5,'BMW','COOPER S HARDTOP 2 DOOR','Turbo',2.0,4,26,None)
        , (6,'BMW','330i','Turbo',2.0,None,27,None)
        , (7,'BMW','440i Coupe','Turbo',3.0,6,23,None)
        , (8,'BMW','440i Coupe','Turbo',3.0,6,23,None)
        , (9,'Mercedes-Benz',None,None,None,None,27,None)
        , (10,'Mercedes-Benz','CLS 550','Turbo',4.7,8,21,79231)
        , (11,'Volkswagen','GTI','Turbo',2.0,4,None,None)
        , (12,'Ford Motor Company','FUSION AWD','Turbo',2.7,6,20,None)
        , (13,'Nissan','Q50 AWD RED SPORT','Turbo',3.0,6,22,None)
        , (14,'Nissan','Q70 AWD','Aspirated',5.6,8,18,None)
        , (15,'Kia','Stinger RWD','Turbo',2.0,4,25,None)
        , (16,'Toyota','CAMRY HYBRID LE','Aspirated',2.5,4,46,None)
        , (16,'Toyota','CAMRY HYBRID LE','Aspirated',2.5,4,46,None)
        , (18,'FCA US LLC','300','Aspirated',3.6,6,23,None)
        , (19,'Hyundai','G80 AWD','Turbo',3.3,6,20,None)
        , (20,'Hyundai','G80 AWD','Turbo',3.3,6,20,None)
        , (21,'BMW','X5 M','Turbo',4.4,8,18,121231)
        , (22,'GE','K1500 SUBURBAN 4WD','Aspirated',5.3,8,18,None)
    ], ['Id','Manufacturer','Model','EngineType','Displacement',
        'Cylinders','FuelEconomy','MSRP'])

# Handling duplicates

### Exact duplicates

In [None]:
# do we have any rows that are duplicated?
dirty_data.count(), dirty_data.distinct().count()

(22, 21)

In [None]:
# what row is duplicated?
(
    dirty_data
    .groupby(dirty_data.columns)
    .count()
    .filter('count > 1')
    .show()
)

+---+------------+---------------+----------+------------+---------+-----------+----+-----+
| Id|Manufacturer|          Model|EngineType|Displacement|Cylinders|FuelEconomy|MSRP|count|
+---+------------+---------------+----------+------------+---------+-----------+----+-----+
| 16|      Toyota|CAMRY HYBRID LE| Aspirated|         2.5|        4|         46|NULL|    2|
+---+------------+---------------+----------+------------+---------+-----------+----+-----+



In [None]:
# remove the duplicated rows
full_removed = dirty_data.dropDuplicates()
full_removed.count()

21

### Only ID differs

In [None]:
# count of rows
no_ids = (
    full_removed
    .select([col for col in full_removed.columns if col != 'Id'])
)

no_ids.count(), no_ids.distinct().count()

(21, 19)

In [None]:
# what row is duplicated?
(
    full_removed
    .groupby([col for col in full_removed.columns if col != 'Id'])
    .count()
    .filter('count > 1')
    .show()
)

+------------+----------+----------+------------+---------+-----------+----+-----+
|Manufacturer|     Model|EngineType|Displacement|Cylinders|FuelEconomy|MSRP|count|
+------------+----------+----------+------------+---------+-----------+----+-----+
|         BMW|440i Coupe|     Turbo|         3.0|        6|         23|NULL|    2|
|     Hyundai|   G80 AWD|     Turbo|         3.3|        6|         20|NULL|    2|
+------------+----------+----------+------------+---------+-----------+----+-----+



In [None]:
# remove the duplicated record
id_removed = full_removed.dropDuplicates(
    subset = [col for col in full_removed.columns if col != 'Id']
)

In [None]:
# count
id_removed.count()

19

### Duplicated IDs

In [None]:
# are there any duplicated IDs?
import pyspark.sql.functions as fn

id_removed.agg(
      fn.count('Id').alias('CountOfIDs')
    , fn.countDistinct('Id').alias('CountOfDistinctIDs')
).show()

+----------+------------------+
|CountOfIDs|CountOfDistinctIDs|
+----------+------------------+
|        19|                18|
+----------+------------------+



In [None]:
# what's duplicated?
(
    id_removed
    .groupby('Id')
    .count()
    .filter('count > 1')
    .show()
)

+---+-----+
| Id|count|
+---+-----+
|  3|    2|
+---+-----+



In [None]:
(
    id_removed
    .filter('Id = 3')
    .show()
)

+---+--------------+--------------------+----------+------------+---------+-----------+----+
| Id|  Manufacturer|               Model|EngineType|Displacement|Cylinders|FuelEconomy|MSRP|
+---+--------------+--------------------+----------+------------+---------+-----------+----+
|  3|General Motors|         SPARK ACTIV| Aspirated|         1.4|     NULL|         32|NULL|
|  3|       Porsche|911 Carrera 4S Ca...|     Turbo|         3.0|        6|         24|NULL|
+---+--------------+--------------------+----------+------------+---------+-----------+----+



In [None]:
new_id = (
    id_removed
    .select(
        [fn.monotonically_increasing_id().alias('Id')] +
        [col for col in id_removed.columns if col != 'Id'])
)

new_id.show()

+---+------------------+--------------------+----------+------------+---------+-----------+------+
| Id|      Manufacturer|               Model|EngineType|Displacement|Cylinders|FuelEconomy|  MSRP|
+---+------------------+--------------------+----------+------------+---------+-----------+------+
|  0|     Mercedes-Benz|             CLS 550|     Turbo|         4.7|        8|         21| 79231|
|  1|               BMW|                X5 M|     Turbo|         4.4|        8|         18|121231|
|  2|    General Motors|         SPARK ACTIV| Aspirated|         1.4|     NULL|         32|  NULL|
|  3|     Mercedes-Benz|                NULL|      NULL|        NULL|     NULL|         27|  NULL|
|  4|               BMW|COOPER S HARDTOP ...|     Turbo|         2.0|        4|         26|  NULL|
|  5|      Aston Martin|            Vanquish| Aspirated|         6.0|       12|         16|  NULL|
|  6|        Volkswagen|                 GTI|     Turbo|         2.0|        4|       NULL|  NULL|
|  7|     

# Handling missing observations

### Missing observations per row

In [None]:
(
    spark.createDataFrame(
        new_id.rdd.map(
           lambda row: (
                 row['Id']
               , sum([c == None for c in row])
           )
        )
        .filter(lambda el: el[1] > 1)
        .collect()
        ,['Id', 'CountMissing']
    )
    .orderBy('CountMissing', ascending=False)
    .show()
)

+---+------------+
| Id|CountMissing|
+---+------------+
|  3|           5|
|  6|           2|
|  2|           2|
|  7|           2|
+---+------------+



In [None]:
(
    new_id
    .where('Id == 3')
    .show()
)

+---+-------------+-----+----------+------------+---------+-----------+----+
| Id| Manufacturer|Model|EngineType|Displacement|Cylinders|FuelEconomy|MSRP|
+---+-------------+-----+----------+------------+---------+-----------+----+
|  3|Mercedes-Benz| NULL|      NULL|        NULL|     NULL|         27|NULL|
+---+-------------+-----+----------+------------+---------+-----------+----+



In [None]:
merc_out = new_id.dropna(thresh=4)
new_id.count(), merc_out.count()

(19, 18)

In [None]:
(
    merc_out
    .where('Id == 3')
    .show()
)

+---+------------+-----+----------+------------+---------+-----------+----+
| Id|Manufacturer|Model|EngineType|Displacement|Cylinders|FuelEconomy|MSRP|
+---+------------+-----+----------+------------+---------+-----------+----+
+---+------------+-----+----------+------------+---------+-----------+----+



### Missing observations per column

In [None]:
for k, v in sorted(
    merc_out.agg(*[
               (1 - (fn.count(c) / fn.count('*')))
                    .alias(c + '_miss')
               for c in merc_out.columns
           ])
        .collect()[0]
        .asDict()
        .items()
    , key=lambda el: el[1]
    , reverse=True
):
    print(k, v)

MSRP_miss 0.8888888888888888
Cylinders_miss 0.11111111111111116
FuelEconomy_miss 0.05555555555555558
Id_miss 0.0
Manufacturer_miss 0.0
Model_miss 0.0
EngineType_miss 0.0
Displacement_miss 0.0


In [None]:
no_MSRP = merc_out.select([col for col in new_id.columns if col != 'MSRP'])
no_MSRP.show()

+---+------------------+--------------------+----------+------------+---------+-----------+
| Id|      Manufacturer|               Model|EngineType|Displacement|Cylinders|FuelEconomy|
+---+------------------+--------------------+----------+------------+---------+-----------+
|  0|     Mercedes-Benz|             CLS 550|     Turbo|         4.7|        8|         21|
|  1|               BMW|                X5 M|     Turbo|         4.4|        8|         18|
|  2|    General Motors|         SPARK ACTIV| Aspirated|         1.4|     NULL|         32|
|  4|               BMW|COOPER S HARDTOP ...|     Turbo|         2.0|        4|         26|
|  5|      Aston Martin|            Vanquish| Aspirated|         6.0|       12|         16|
|  6|        Volkswagen|                 GTI|     Turbo|         2.0|        4|       NULL|
|  7|               BMW|                330i|     Turbo|         2.0|     NULL|         27|
|  8|           Porsche|           Boxster S|     Turbo|         2.5|        4| 

### Sparse missing observations

In [None]:
multipliers = (
    no_MSRP
    .agg(
          fn.mean(
              fn.col('FuelEconomy') /
              (
                  fn.col('Displacement') * fn.col('Cylinders')
              )
          ).alias('FuelEconomy')
        , fn.mean(
            fn.col('Cylinders') /
            fn.col('Displacement')
        ).alias('Cylinders')
    )
).toPandas().to_dict('records')[0]

multipliers

{'FuelEconomy': 1.4957485048359977, 'Cylinders': 1.8353365984789107}

In [None]:
imputed = (
    no_MSRP
    .withColumn('FuelEconomy', fn.col('FuelEconomy')   / fn.col('Displacement') / fn.col('Cylinders'))
    .withColumn('Cylinders',   fn.col('Cylinders')   / fn.col('Displacement'))
    .fillna(multipliers)
    .withColumn('Cylinders',   (fn.col('Cylinders')   * fn.col('Displacement')).cast('integer'))
    .withColumn('FuelEconomy', fn.col('FuelEconomy') * fn.col('Displacement') * fn.col('Cylinders'))
)

imputed.show()

+---+------------------+--------------------+----------+------------+---------+------------------+
| Id|      Manufacturer|               Model|EngineType|Displacement|Cylinders|       FuelEconomy|
+---+------------------+--------------------+----------+------------+---------+------------------+
|  0|     Mercedes-Benz|             CLS 550|     Turbo|         4.7|        8|              21.0|
|  1|               BMW|                X5 M|     Turbo|         4.4|        8|              18.0|
|  2|    General Motors|         SPARK ACTIV| Aspirated|         1.4|        2|4.1880958135407935|
|  4|               BMW|COOPER S HARDTOP ...|     Turbo|         2.0|        4|              26.0|
|  5|      Aston Martin|            Vanquish| Aspirated|         6.0|       12|              16.0|
|  6|        Volkswagen|                 GTI|     Turbo|         2.0|        4|11.965988038687982|
|  7|               BMW|                330i|     Turbo|         2.0|        3| 8.974491029015986|
|  8|     

# Handling outliers

In [None]:
features = ['Displacement', 'Cylinders', 'FuelEconomy']
quantiles = [0.25, 0.75]

cut_off_points = []

for feature in features:
    quants = imputed.approxQuantile(feature, quantiles, 0.05)

    IQR = quants[1] - quants[0]
    cut_off_points.append((feature, [
        quants[0] - 1.5 * IQR,
        quants[1] + 1.5 * IQR,
    ]))

cut_off_points = dict(cut_off_points)

outliers = imputed.select(*['id'] + [
       (
           (imputed[f] < cut_off_points[f][0]) |
           (imputed[f] > cut_off_points[f][1])
       ).alias(f + '_o') for f in features
  ])
outliers.show()

+---+--------------+-----------+-------------+
| id|Displacement_o|Cylinders_o|FuelEconomy_o|
+---+--------------+-----------+-------------+
|  0|         false|      false|        false|
|  1|         false|      false|        false|
|  2|         false|      false|         true|
|  4|         false|      false|        false|
|  5|         false|       true|        false|
|  6|         false|      false|        false|
|  7|         false|      false|        false|
|  8|         false|      false|        false|
|  9|         false|      false|        false|
| 10|         false|      false|        false|
| 11|         false|      false|        false|
| 12|         false|      false|        false|
| 13|         false|      false|        false|
| 14|         false|      false|        false|
| 15|         false|      false|        false|
| 16|         false|      false|        false|
| 17|         false|      false|         true|
| 18|         false|      false|        false|
+---+--------

In [None]:
with_outliers_flag = imputed.join(outliers, on='Id')

(
    with_outliers_flag
    .filter('FuelEconomy_o')
    .select('Id', 'Manufacturer', 'Model', 'FuelEconomy')
    .show()
)

+---+--------------+---------------+------------------+
| Id|  Manufacturer|          Model|       FuelEconomy|
+---+--------------+---------------+------------------+
|  2|General Motors|    SPARK ACTIV|4.1880958135407935|
| 17|        Toyota|CAMRY HYBRID LE|              46.0|
+---+--------------+---------------+------------------+



In [None]:
no_outliers = (
    with_outliers_flag
    .filter('!FuelEconomy_o')
    .select(imputed.columns)
)

# Exploring descriptive statistics

In [None]:
descriptive_stats = no_outliers.describe(features)
descriptive_stats.show()

+-------+------------------+-----------------+------------------+
|summary|      Displacement|        Cylinders|       FuelEconomy|
+-------+------------------+-----------------+------------------+
|  count|                16|               16|                16|
|   mean|           3.44375|            6.125|19.600446608398162|
| stddev|1.3549753995306826|2.276693508870558|4.6666477673737505|
|    min|               2.0|                3| 8.974491029015986|
|    max|               6.0|               12|              26.0|
+-------+------------------+-----------------+------------------+



In [None]:
descriptive_stats_all = no_outliers.describe()
descriptive_stats_all.show()

+-------+-----------------+------------+-----+----------+------------------+-----------------+------------------+
|summary|               Id|Manufacturer|Model|EngineType|      Displacement|        Cylinders|       FuelEconomy|
+-------+-----------------+------------+-----+----------+------------------+-----------------+------------------+
|  count|               16|          16|   16|        16|                16|               16|                16|
|   mean|           9.3125|        NULL|300.0|      NULL|           3.44375|            6.125|19.600446608398162|
| stddev|5.287958017987662|        NULL| NULL|      NULL|1.3549753995306826|2.276693508870558|4.6666477673737505|
|    min|                0|Aston Martin|  300| Aspirated|               2.0|                3| 8.974491029015986|
|    max|               18|  Volkswagen| X5 M|     Turbo|               6.0|               12|              26.0|
+-------+-----------------+------------+-----+----------+------------------+------------

In [None]:
(
    no_outliers
    .select(features)
    .groupBy('Cylinders')
    .agg(*[
          fn.count('*').alias('Count')
        , fn.mean('FuelEconomy').alias('MPG_avg')
        , fn.mean('Displacement').alias('Disp_avg')
        , fn.stddev('FuelEconomy').alias('MPG_stdev')
        , fn.stddev('Displacement').alias('Disp_stdev')
    ])
    .orderBy('Cylinders')
).show()

+---------+-----+------------------+------------------+------------------+-------------------+
|Cylinders|Count|           MPG_avg|          Disp_avg|         MPG_stdev|         Disp_stdev|
+---------+-----+------------------+------------------+------------------+-------------------+
|        3|    1| 8.974491029015986|               2.0|              NULL|               NULL|
|        4|    4|21.241497009671995|             2.125| 6.413009924998987|0.24999999999999997|
|        5|    1|16.666666666666668|               2.7|              NULL|               NULL|
|        6|    5|              22.4|3.1799999999999997|1.5165750888103096| 0.2683281572999748|
|        8|    4|             18.75|               5.0|               1.5| 0.5477225575051655|
|       12|    1|              16.0|               6.0|              NULL|               NULL|
+---------+-----+------------------+------------------+------------------+-------------------+



# Computing correlations

In [None]:
(
    no_outliers
    .corr('Cylinders', 'Displacement')
)

0.9381829964408112

In [None]:
n_features = len(features)

corr = []

for i in range(0, n_features):
    temp = [None] * i

    for j in range(i, n_features):
        temp.append(no_outliers.corr(features[i], features[j]))
    corr.append([features[i]] + temp)

correlations = spark.createDataFrame(corr, ['Column'] + features)

correlations.show()

+------------+------------+------------------+--------------------+
|      Column|Displacement|         Cylinders|         FuelEconomy|
+------------+------------+------------------+--------------------+
|Displacement|         1.0|0.9381829964408112|-0.10757908872387667|
|   Cylinders|        NULL|               1.0| -0.0421854654503533|
| FuelEconomy|        NULL|              NULL|                 1.0|
+------------+------------+------------------+--------------------+



#EJERCICIO
- Cargar los datos desde el archivo CSV.
- Eliminar duplicados.
- Imputar valores faltantes utilizando la media para las columnas numéricas.
- Detectar y filtrar outliers en la columna "altura" utilizando el rango intercuartílico.
- Mostrar el DataFrame resultante.

In [None]:
schema = StructType([
    StructField("id", IntegerType(), nullable=True),
    StructField("edad", IntegerType(), nullable=True),
    StructField("altura", IntegerType(), nullable=True),
    StructField("peso", IntegerType(), nullable=True)
])

# Definir los datos
data = [
    (1, 25, 170, 70),
    (2, 30, 165, 65),
    (3, 40, None, 80),
    (4, None, 190, 55),
    (5, 35, 195, 75),
    (6, 45, 185, 85),
    (7, 28, 172, 72),
    (8, 25, None, 68),
    (9, 32, 198, 78),
    (10, 38, 176, 76),
    (11, 22, 158, 53),
    (12, None, 173, 71),
    (13, 33, 166, 77),
    (14, 37, 174, 74),
    (15, 42, 182, 83),
    (1, None, 169, 69),
    (2, 29, 164, 64),
    (3, 41, 179, 79),
    (16, 45, 185, 85),
    (17, 28, 172, 72)
]
# Crear el DataFrame
df = spark.createDataFrame(data, schema)

# Mostrar el DataFrame
df.show()



+---+----+------+----+
| id|edad|altura|peso|
+---+----+------+----+
|  1|  25|   170|  70|
|  2|  30|   165|  65|
|  3|  40|  NULL|  80|
|  4|NULL|   190|  55|
|  5|  35|   195|  75|
|  6|  45|   185|  85|
|  7|  28|   172|  72|
|  8|  25|  NULL|  68|
|  9|  32|   198|  78|
| 10|  38|   176|  76|
| 11|  22|   158|  53|
| 12|NULL|   173|  71|
| 13|  33|   166|  77|
| 14|  37|   174|  74|
| 15|  42|   182|  83|
|  1|NULL|   169|  69|
|  2|  29|   164|  64|
|  3|  41|   179|  79|
| 16|  45|   185|  85|
| 17|  28|   172|  72|
+---+----+------+----+

+---+----+------+----+
| id|edad|altura|peso|
+---+----+------+----+
|  1|  25|   170|  70|
|  2|  30|   165|  65|
|  3|  40|  NULL|  80|
|  4|NULL|   190|  55|
|  5|  35|   195|  75|
|  6|  45|   185|  85|
|  7|  28|   172|  72|
|  8|  25|  NULL|  68|
|  9|  32|   198|  78|
| 10|  38|   176|  76|
| 11|  22|   158|  53|
| 12|NULL|   173|  71|
| 13|  33|   166|  77|
| 14|  37|   174|  74|
| 15|  42|   182|  83|
|  1|NULL|   169|  69|
|  2|  29|

In [None]:
# are there any duplicated IDs?
import pyspark.sql.functions as fn

df.agg(
      fn.count('id').alias('CountOfIDs')
    , fn.countDistinct('id').alias('CountOfDistinctIDs')
).show()

+----------+------------------+
|CountOfIDs|CountOfDistinctIDs|
+----------+------------------+
|        20|                17|
+----------+------------------+



In [None]:
# count of rows
no_ids = (
    df
    .select([col for col in df.columns if col != 'id'])
)

no_ids.count(), no_ids.distinct().count()

(20, 18)

In [None]:
# what row is duplicated?
(
    df
    .groupby(df.columns)
    .count()
    .filter('count > 1')
    .show()
)

+---+----+------+----+-----+
| id|edad|altura|peso|count|
+---+----+------+----+-----+
+---+----+------+----+-----+



In [None]:
for k, v in sorted(
    df.agg(*[
               (1 - (fn.count(c) / fn.count('*')))
                    .alias(c + '_miss')
               for c in df.columns
           ])
        .collect()[0]
        .asDict()
        .items()
    , key=lambda el: el[1]
    , reverse=True
):
    print(k, v)

edad_miss 0.15000000000000002
altura_miss 0.09999999999999998
id_miss 0.0
peso_miss 0.0


In [None]:
descriptive_stats = df.describe(df.columns)
descriptive_stats.show()

descriptive_stats["id"]["mean"]

+-------+-----------------+------------------+------------------+-----------------+
|summary|               id|              edad|            altura|             peso|
+-------+-----------------+------------------+------------------+-----------------+
|  count|               20|                17|                18|               20|
|   mean|             7.95|  33.8235294117647|176.27777777777777|            72.55|
| stddev|5.306153329663093|7.2477176935022705| 11.02300031514616|8.732727312574777|
|    min|                1|                22|               158|               53|
|    max|               17|                45|               198|               85|
+-------+-----------------+------------------+------------------+-----------------+



Column<'id[mean]'>