# PySpark

In [1]:
from pyspark.sql import SparkSession

## 1. Inizializzare sessione Spark

In [2]:
spark = SparkSession.builder.appName('training').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/28 12:31:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [76]:
spark

## 2. Caricare dati Spark

Senza specificare le colonne sono viste come stringe e aggiunto un nome di colonna generico _c0, _c1...

In [83]:
df_spark = spark.read.csv('data/test1.csv')

In [84]:
df_spark

DataFrame[_c0: string, _c1: string]

In [85]:
df_spark.show()

+--------+---+
|     _c0|_c1|
+--------+---+
|    Name|Age|
|   Marco| 35|
|Giovanni| 37|
|    Luca| 24|
|   Bruno| 25|
| Filippo| 18|
+--------+---+



In [86]:
df_spark = spark.read.option('header', 'true').csv('data/test1.csv')

In [87]:
df_spark

DataFrame[Name: string, Age: string]

In [88]:
df_spark.show()

+--------+---+
|    Name|Age|
+--------+---+
|   Marco| 35|
|Giovanni| 37|
|    Luca| 24|
|   Bruno| 25|
| Filippo| 18|
+--------+---+



* header --> utilizza la prima riga come nome delle colonne
* inferSchema --> deduce il tipo di colonna

In [89]:
df_spark = spark.read.csv('data/test1.csv', header=True, inferSchema=True)

In [90]:
df_spark

DataFrame[Name: string, Age: int]

In [91]:
df_spark.show()

+--------+---+
|    Name|Age|
+--------+---+
|   Marco| 35|
|Giovanni| 37|
|    Luca| 24|
|   Bruno| 25|
| Filippo| 18|
+--------+---+



## 3. Visualizzare Dataframe

In [92]:
df_spark.head(3)

[Row(Name='Marco', Age=35),
 Row(Name='Giovanni', Age=37),
 Row(Name='Luca', Age=24)]

In [93]:
df_spark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)



## 4. Dataframe
* PySpark Dataframe
* Reading dataset
* Select columns
* Describe dataframe
* Add columns
* Drop columns
* Rename columns

In [95]:
df_spark = spark.read.csv('data/iris.csv', header=True, inferSchema=True)

In [96]:
df_spark

DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, variety: string]

In [51]:
df_spark.show()

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|variety|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| Setosa|
|         4.9|        3.0|         1.4|        0.2| Setosa|
|         4.7|        3.2|         1.3|        0.2| Setosa|
|         4.6|        3.1|         1.5|        0.2| Setosa|
|         5.0|        3.6|         1.4|        0.2| Setosa|
|         5.4|        3.9|         1.7|        0.4| Setosa|
|         4.6|        3.4|         1.4|        0.3| Setosa|
|         5.0|        3.4|         1.5|        0.2| Setosa|
|         4.4|        2.9|         1.4|        0.2| Setosa|
|         4.9|        3.1|         1.5|        0.1| Setosa|
|         5.4|        3.7|         1.5|        0.2| Setosa|
|         4.8|        3.4|         1.6|        0.2| Setosa|
|         4.8|        3.0|         1.4|        0.1| Setosa|
|         4.3|        3.0|         1.1| 

In [97]:
df_spark.head(5)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, variety='Setosa'),
 Row(sepal_length=4.9, sepal_width=3.0, petal_length=1.4, petal_width=0.2, variety='Setosa'),
 Row(sepal_length=4.7, sepal_width=3.2, petal_length=1.3, petal_width=0.2, variety='Setosa'),
 Row(sepal_length=4.6, sepal_width=3.1, petal_length=1.5, petal_width=0.2, variety='Setosa'),
 Row(sepal_length=5.0, sepal_width=3.6, petal_length=1.4, petal_width=0.2, variety='Setosa')]

In [98]:
df_spark.printSchema()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- variety: string (nullable = true)



### Select columns

In [99]:
df_spark.select('variety')

DataFrame[variety: string]

In [100]:
df_spark.select('variety').show()

+-------+
|variety|
+-------+
| Setosa|
| Setosa|
| Setosa|
| Setosa|
| Setosa|
| Setosa|
| Setosa|
| Setosa|
| Setosa|
| Setosa|
| Setosa|
| Setosa|
| Setosa|
| Setosa|
| Setosa|
| Setosa|
| Setosa|
| Setosa|
| Setosa|
| Setosa|
+-------+
only showing top 20 rows



In [101]:
df_spark.select(['sepal_length', 'variety'])

DataFrame[sepal_length: double, variety: string]

In [102]:
df_spark.select(['sepal_length', 'variety']).show()

+------------+-------+
|sepal_length|variety|
+------------+-------+
|         5.1| Setosa|
|         4.9| Setosa|
|         4.7| Setosa|
|         4.6| Setosa|
|         5.0| Setosa|
|         5.4| Setosa|
|         4.6| Setosa|
|         5.0| Setosa|
|         4.4| Setosa|
|         4.9| Setosa|
|         5.4| Setosa|
|         4.8| Setosa|
|         4.8| Setosa|
|         4.3| Setosa|
|         5.8| Setosa|
|         5.7| Setosa|
|         5.4| Setosa|
|         5.1| Setosa|
|         5.7| Setosa|
|         5.1| Setosa|
+------------+-------+
only showing top 20 rows



In [103]:
df_spark.variety

Column<'variety'>

In [104]:
df_spark.dtypes

[('sepal_length', 'double'),
 ('sepal_width', 'double'),
 ('petal_length', 'double'),
 ('petal_width', 'double'),
 ('variety', 'string')]

### Describe Dataframe

In [105]:
df_spark.describe()

DataFrame[summary: string, sepal_length: string, sepal_width: string, petal_length: string, petal_width: string, variety: string]

In [106]:
df_spark.describe().show()

+-------+------------------+-------------------+------------------+------------------+---------+
|summary|      sepal_length|        sepal_width|      petal_length|       petal_width|  variety|
+-------+------------------+-------------------+------------------+------------------+---------+
|  count|               150|                150|               150|               150|      150|
|   mean| 5.843333333333335|  3.057333333333334|3.7580000000000027| 1.199333333333334|     null|
| stddev|0.8280661279778637|0.43586628493669793|1.7652982332594662|0.7622376689603467|     null|
|    min|               4.3|                2.0|               1.0|               0.1|   Setosa|
|    max|               7.9|                4.4|               6.9|               2.5|Virginica|
+-------+------------------+-------------------+------------------+------------------+---------+



### Add columns

In [107]:
df_spark.withColumn('sepal', (df_spark['sepal_length'] + df_spark['sepal_width'])/2)

DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, variety: string, sepal: double]

In [108]:
df_spark.withColumn('sepal', (df_spark['sepal_length'] + df_spark['sepal_width'])/2).show()

+------------+-----------+------------+-----------+-------+------------------+
|sepal_length|sepal_width|petal_length|petal_width|variety|             sepal|
+------------+-----------+------------+-----------+-------+------------------+
|         5.1|        3.5|         1.4|        0.2| Setosa|               4.3|
|         4.9|        3.0|         1.4|        0.2| Setosa|              3.95|
|         4.7|        3.2|         1.3|        0.2| Setosa|              3.95|
|         4.6|        3.1|         1.5|        0.2| Setosa|3.8499999999999996|
|         5.0|        3.6|         1.4|        0.2| Setosa|               4.3|
|         5.4|        3.9|         1.7|        0.4| Setosa|              4.65|
|         4.6|        3.4|         1.4|        0.3| Setosa|               4.0|
|         5.0|        3.4|         1.5|        0.2| Setosa|               4.2|
|         4.4|        2.9|         1.4|        0.2| Setosa|3.6500000000000004|
|         4.9|        3.1|         1.5|        0.1| 

### Drop columns

In [109]:
df_spark = df_spark.withColumn('sepal', (df_spark['sepal_length'] + df_spark['sepal_width'])/2)

In [110]:
df_spark.show()

+------------+-----------+------------+-----------+-------+------------------+
|sepal_length|sepal_width|petal_length|petal_width|variety|             sepal|
+------------+-----------+------------+-----------+-------+------------------+
|         5.1|        3.5|         1.4|        0.2| Setosa|               4.3|
|         4.9|        3.0|         1.4|        0.2| Setosa|              3.95|
|         4.7|        3.2|         1.3|        0.2| Setosa|              3.95|
|         4.6|        3.1|         1.5|        0.2| Setosa|3.8499999999999996|
|         5.0|        3.6|         1.4|        0.2| Setosa|               4.3|
|         5.4|        3.9|         1.7|        0.4| Setosa|              4.65|
|         4.6|        3.4|         1.4|        0.3| Setosa|               4.0|
|         5.0|        3.4|         1.5|        0.2| Setosa|               4.2|
|         4.4|        2.9|         1.4|        0.2| Setosa|3.6500000000000004|
|         4.9|        3.1|         1.5|        0.1| 

In [111]:
df_spark = df_spark.drop('sepal')

In [112]:
df_spark.show()

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|variety|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| Setosa|
|         4.9|        3.0|         1.4|        0.2| Setosa|
|         4.7|        3.2|         1.3|        0.2| Setosa|
|         4.6|        3.1|         1.5|        0.2| Setosa|
|         5.0|        3.6|         1.4|        0.2| Setosa|
|         5.4|        3.9|         1.7|        0.4| Setosa|
|         4.6|        3.4|         1.4|        0.3| Setosa|
|         5.0|        3.4|         1.5|        0.2| Setosa|
|         4.4|        2.9|         1.4|        0.2| Setosa|
|         4.9|        3.1|         1.5|        0.1| Setosa|
|         5.4|        3.7|         1.5|        0.2| Setosa|
|         4.8|        3.4|         1.6|        0.2| Setosa|
|         4.8|        3.0|         1.4|        0.1| Setosa|
|         4.3|        3.0|         1.1| 

### Rename columns

In [114]:
df_spark.withColumnRenamed('variety', 'class')

DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, class: string]

In [115]:
df_spark.withColumnRenamed('variety', 'class').show()

+------------+-----------+------------+-----------+------+
|sepal_length|sepal_width|petal_length|petal_width| class|
+------------+-----------+------------+-----------+------+
|         5.1|        3.5|         1.4|        0.2|Setosa|
|         4.9|        3.0|         1.4|        0.2|Setosa|
|         4.7|        3.2|         1.3|        0.2|Setosa|
|         4.6|        3.1|         1.5|        0.2|Setosa|
|         5.0|        3.6|         1.4|        0.2|Setosa|
|         5.4|        3.9|         1.7|        0.4|Setosa|
|         4.6|        3.4|         1.4|        0.3|Setosa|
|         5.0|        3.4|         1.5|        0.2|Setosa|
|         4.4|        2.9|         1.4|        0.2|Setosa|
|         4.9|        3.1|         1.5|        0.1|Setosa|
|         5.4|        3.7|         1.5|        0.2|Setosa|
|         4.8|        3.4|         1.6|        0.2|Setosa|
|         4.8|        3.0|         1.4|        0.1|Setosa|
|         4.3|        3.0|         1.1|        0.1|Setos

## 5. Missing Value

In [128]:
df_spark = spark.read.csv('data/test2.csv', header=True, inferSchema=True)

In [129]:
df_spark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [130]:
df_spark.show()

+--------+----+----+------+
|    Name| Age|Year|Salary|
+--------+----+----+------+
|   Marco|  35|   1| 30000|
|Giovanni|  37|   6| 35000|
|    Luca|  24|null| 12000|
|   Bruno|null|   5| 76000|
|    null|  18|null| 32000|
|   Italo|  22|   3| 27000|
|  Giaomo|null|null|  null|
| Alfonso|null|null| 11000|
+--------+----+----+------+



### Drop rows with at least 1 null

In [131]:
df_spark.na.drop().show()

+--------+---+----+------+
|    Name|Age|Year|Salary|
+--------+---+----+------+
|   Marco| 35|   1| 30000|
|Giovanni| 37|   6| 35000|
|   Italo| 22|   3| 27000|
+--------+---+----+------+



### Save rows with at least N non-Null values

In [136]:
df_spark.na.drop(thresh=2).show()

+--------+----+----+------+
|    Name| Age|Year|Salary|
+--------+----+----+------+
|   Marco|  35|   1| 30000|
|Giovanni|  37|   6| 35000|
|    Luca|  24|null| 12000|
|   Bruno|null|   5| 76000|
|    null|  18|null| 32000|
|   Italo|  22|   3| 27000|
| Alfonso|null|null| 11000|
+--------+----+----+------+



In [137]:
df_spark.na.drop(thresh=3).show()

+--------+----+----+------+
|    Name| Age|Year|Salary|
+--------+----+----+------+
|   Marco|  35|   1| 30000|
|Giovanni|  37|   6| 35000|
|    Luca|  24|null| 12000|
|   Bruno|null|   5| 76000|
|   Italo|  22|   3| 27000|
+--------+----+----+------+



### Drop Null for specific columns

In [140]:
df_spark.na.drop(subset=['Name', 'Salary']).show()

+--------+----+----+------+
|    Name| Age|Year|Salary|
+--------+----+----+------+
|   Marco|  35|   1| 30000|
|Giovanni|  37|   6| 35000|
|    Luca|  24|null| 12000|
|   Bruno|null|   5| 76000|
|   Italo|  22|   3| 27000|
| Alfonso|null|null| 11000|
+--------+----+----+------+



In [141]:
df_spark.na.drop(subset='Name').show()

+--------+----+----+------+
|    Name| Age|Year|Salary|
+--------+----+----+------+
|   Marco|  35|   1| 30000|
|Giovanni|  37|   6| 35000|
|    Luca|  24|null| 12000|
|   Bruno|null|   5| 76000|
|   Italo|  22|   3| 27000|
| Giacomo|null|null|  null|
| Alfonso|null|null| 11000|
+--------+----+----+------+



### Fill NA

In [150]:
df_spark.na.fill(999999).show()

+--------+------+------+------+
|    Name|   Age|  Year|Salary|
+--------+------+------+------+
|   Marco|    35|     1| 30000|
|Giovanni|    37|     6| 35000|
|    Luca|    24|999999| 12000|
|   Bruno|999999|     5| 76000|
|    null|    18|999999| 32000|
|   Italo|    22|     3| 27000|
| Giacomo|999999|999999|999999|
| Alfonso|999999|999999| 11000|
+--------+------+------+------+



In [147]:
df_spark.na.fill('Sconosciuto').show()

+-----------+----+----+------+
|       Name| Age|Year|Salary|
+-----------+----+----+------+
|      Marco|  35|   1| 30000|
|   Giovanni|  37|   6| 35000|
|       Luca|  24|null| 12000|
|      Bruno|null|   5| 76000|
|Sconosciuto|  18|null| 32000|
|      Italo|  22|   3| 27000|
|    Giacomo|null|null|  null|
|    Alfonso|null|null| 11000|
+-----------+----+----+------+



In [151]:
df_spark.na.fill({'Name': 'Sconosciuto', 'Age': 'missing', 'Year': 0}).show()

+-----------+----+----+------+
|       Name| Age|Year|Salary|
+-----------+----+----+------+
|      Marco|  35|   1| 30000|
|   Giovanni|  37|   6| 35000|
|       Luca|  24|   0| 12000|
|      Bruno|null|   5| 76000|
|Sconosciuto|  18|   0| 32000|
|      Italo|  22|   3| 27000|
|    Giacomo|null|   0|  null|
|    Alfonso|null|   0| 11000|
+-----------+----+----+------+



### Fill NA with mean/median

In [169]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols = ['Age', 'Year', 'Salary'],
    outputCols = [f"{i}_imputed" for i in input_cols]
).setStrategy('mean')

imputer.fit(df_spark).transform(df_spark).show()

+--------+----+----+------+-----------+------------+--------------+
|    Name| Age|Year|Salary|Age_imputed|Year_imputed|Salary_imputed|
+--------+----+----+------+-----------+------------+--------------+
|   Marco|  35|   1| 30000|         35|           1|         30000|
|Giovanni|  37|   6| 35000|         37|           6|         35000|
|    Luca|  24|null| 12000|         24|           3|         12000|
|   Bruno|null|   5| 76000|         27|           5|         76000|
|    null|  18|null| 32000|         18|           3|         32000|
|   Italo|  22|   3| 27000|         22|           3|         27000|
| Giacomo|null|null|  null|         27|           3|         31857|
| Alfonso|null|null| 11000|         27|           3|         11000|
+--------+----+----+------+-----------+------------+--------------+



In [170]:
imputer = Imputer(
    inputCols = ['Age', 'Year', 'Salary'],
    outputCols = [f"{i}_imputed" for i in input_cols]
).setStrategy('median')

imputer.fit(df_spark).transform(df_spark).show()

+--------+----+----+------+-----------+------------+--------------+
|    Name| Age|Year|Salary|Age_imputed|Year_imputed|Salary_imputed|
+--------+----+----+------+-----------+------------+--------------+
|   Marco|  35|   1| 30000|         35|           1|         30000|
|Giovanni|  37|   6| 35000|         37|           6|         35000|
|    Luca|  24|null| 12000|         24|           3|         12000|
|   Bruno|null|   5| 76000|         24|           5|         76000|
|    null|  18|null| 32000|         18|           3|         32000|
|   Italo|  22|   3| 27000|         22|           3|         27000|
| Giacomo|null|null|  null|         24|           3|         30000|
| Alfonso|null|null| 11000|         24|           3|         11000|
+--------+----+----+------+-----------+------------+--------------+



### Filter Dataframe
* Fitler operation
* &, |, ==
* ~

In [173]:
df_spark = spark.read.csv('data/iris.csv', header=True, inferSchema=True)

In [174]:
df_spark.show()

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|variety|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| Setosa|
|         4.9|        3.0|         1.4|        0.2| Setosa|
|         4.7|        3.2|         1.3|        0.2| Setosa|
|         4.6|        3.1|         1.5|        0.2| Setosa|
|         5.0|        3.6|         1.4|        0.2| Setosa|
|         5.4|        3.9|         1.7|        0.4| Setosa|
|         4.6|        3.4|         1.4|        0.3| Setosa|
|         5.0|        3.4|         1.5|        0.2| Setosa|
|         4.4|        2.9|         1.4|        0.2| Setosa|
|         4.9|        3.1|         1.5|        0.1| Setosa|
|         5.4|        3.7|         1.5|        0.2| Setosa|
|         4.8|        3.4|         1.6|        0.2| Setosa|
|         4.8|        3.0|         1.4|        0.1| Setosa|
|         4.3|        3.0|         1.1| 

In [187]:
df_spark.filter('sepal_length<4.5').show()

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|variety|
+------------+-----------+------------+-----------+-------+
|         4.4|        2.9|         1.4|        0.2| Setosa|
|         4.3|        3.0|         1.1|        0.1| Setosa|
|         4.4|        3.0|         1.3|        0.2| Setosa|
|         4.4|        3.2|         1.3|        0.2| Setosa|
+------------+-----------+------------+-----------+-------+



In [177]:
df_spark[df_spark['variety']=='Versicolor'].show()

+------------+-----------+------------+-----------+----------+
|sepal_length|sepal_width|petal_length|petal_width|   variety|
+------------+-----------+------------+-----------+----------+
|         7.0|        3.2|         4.7|        1.4|Versicolor|
|         6.4|        3.2|         4.5|        1.5|Versicolor|
|         6.9|        3.1|         4.9|        1.5|Versicolor|
|         5.5|        2.3|         4.0|        1.3|Versicolor|
|         6.5|        2.8|         4.6|        1.5|Versicolor|
|         5.7|        2.8|         4.5|        1.3|Versicolor|
|         6.3|        3.3|         4.7|        1.6|Versicolor|
|         4.9|        2.4|         3.3|        1.0|Versicolor|
|         6.6|        2.9|         4.6|        1.3|Versicolor|
|         5.2|        2.7|         3.9|        1.4|Versicolor|
|         5.0|        2.0|         3.5|        1.0|Versicolor|
|         5.9|        3.0|         4.2|        1.5|Versicolor|
|         6.0|        2.2|         4.0|        1.0|Vers

In [192]:
df_spark.filter((df_spark['sepal_length']<5) & 
                (df_spark['sepal_length']>4.7)).show()

+------------+-----------+------------+-----------+----------+
|sepal_length|sepal_width|petal_length|petal_width|   variety|
+------------+-----------+------------+-----------+----------+
|         4.9|        3.0|         1.4|        0.2|    Setosa|
|         4.9|        3.1|         1.5|        0.1|    Setosa|
|         4.8|        3.4|         1.6|        0.2|    Setosa|
|         4.8|        3.0|         1.4|        0.1|    Setosa|
|         4.8|        3.4|         1.9|        0.2|    Setosa|
|         4.8|        3.1|         1.6|        0.2|    Setosa|
|         4.9|        3.1|         1.5|        0.2|    Setosa|
|         4.9|        3.6|         1.4|        0.1|    Setosa|
|         4.8|        3.0|         1.4|        0.3|    Setosa|
|         4.9|        2.4|         3.3|        1.0|Versicolor|
|         4.9|        2.5|         4.5|        1.7| Virginica|
+------------+-----------+------------+-----------+----------+



In [195]:
df_spark.filter(~(df_spark['sepal_length']<7)).show()

+------------+-----------+------------+-----------+----------+
|sepal_length|sepal_width|petal_length|petal_width|   variety|
+------------+-----------+------------+-----------+----------+
|         7.0|        3.2|         4.7|        1.4|Versicolor|
|         7.1|        3.0|         5.9|        2.1| Virginica|
|         7.6|        3.0|         6.6|        2.1| Virginica|
|         7.3|        2.9|         6.3|        1.8| Virginica|
|         7.2|        3.6|         6.1|        2.5| Virginica|
|         7.7|        3.8|         6.7|        2.2| Virginica|
|         7.7|        2.6|         6.9|        2.3| Virginica|
|         7.7|        2.8|         6.7|        2.0| Virginica|
|         7.2|        3.2|         6.0|        1.8| Virginica|
|         7.2|        3.0|         5.8|        1.6| Virginica|
|         7.4|        2.8|         6.1|        1.9| Virginica|
|         7.9|        3.8|         6.4|        2.0| Virginica|
|         7.7|        3.0|         6.1|        2.3| Vir

## 6. GroupBy and Aggregate

In [197]:
df_spark = spark.read.csv('data/test3.csv', header=True, inferSchema=True)

In [198]:
df_spark.show()

+--------+----+------+
|    Name| Job|Salary|
+--------+----+------+
|   Marco|  DS| 30000|
|Giovanni|  DS| 35000|
|    Luca|  DA| 12000|
|   Bruno|  DS| 76000|
|Telemaco|  DA| 32000|
|   Italo|JrDS| 27000|
| Giacomo|JrDS| 11200|
| Alfonso|JrDA| 11000|
| Rebecca|  DA| 25000|
|   Maria|JrDA| 15000|
+--------+----+------+



In [200]:
df_spark.groupBy('Job').mean().show()

+----+-----------+
| Job|avg(Salary)|
+----+-----------+
|  DA|    23000.0|
|JrDA|    13000.0|
|  DS|    47000.0|
|JrDS|    19100.0|
+----+-----------+



In [201]:
df_spark.groupBy('Job').count().show()

+----+-----+
| Job|count|
+----+-----+
|  DA|    3|
|JrDA|    2|
|  DS|    3|
|JrDS|    2|
+----+-----+



In [202]:
df_spark.groupBy('Job').sum().show()

+----+-----------+
| Job|sum(Salary)|
+----+-----------+
|  DA|      69000|
|JrDA|      26000|
|  DS|     141000|
|JrDS|      38200|
+----+-----------+



In [226]:
df_spark.groupBy('Job').agg({'Salary':'sum'}).show()

+----+-----------+
| Job|sum(Salary)|
+----+-----------+
|  DA|      69000|
|JrDA|      26000|
|  DS|     141000|
|JrDS|      38200|
+----+-----------+



## 7. Machine learning

In [228]:
df_spark = spark.read.csv('data/test4.csv', header=True, inferSchema=True)

In [229]:
df_spark.columns

['Name', 'Age', 'Experience', 'Salary']

In [231]:
predictors = ['Age', 'Experience']

In [235]:
from pyspark.ml.feature import VectorAssembler
featureAssembler = VectorAssembler(inputCols=predictors, outputCol='Independent_predictors')

In [236]:
output = featureAssembler.transform(df_spark)

In [237]:
output.show()

+--------+---+----------+------+----------------------+
|    Name|Age|Experience|Salary|Independent_predictors|
+--------+---+----------+------+----------------------+
|   Marco| 31|        10| 30000|           [31.0,10.0]|
|Giovanni| 30|         8| 25000|            [30.0,8.0]|
|    Luca| 29|         4| 20000|            [29.0,4.0]|
|   Bruno| 24|         3| 20000|            [24.0,3.0]|
|Telemaco| 21|         1| 15000|            [21.0,1.0]|
|   Italo| 23|         2| 18000|            [23.0,2.0]|
+--------+---+----------+------+----------------------+



In [239]:
final_df = output.select(['Independent_predictors', 'Salary'])
final_df.show()

+----------------------+------+
|Independent_predictors|Salary|
+----------------------+------+
|           [31.0,10.0]| 30000|
|            [30.0,8.0]| 25000|
|            [29.0,4.0]| 20000|
|            [24.0,3.0]| 20000|
|            [21.0,1.0]| 15000|
|            [23.0,2.0]| 18000|
+----------------------+------+



### Linear Regression

In [242]:
from pyspark.ml.regression import LinearRegression
train_data, test_data = final_df.randomSplit([0.75, 0.25])

In [243]:
train_data.show()

+----------------------+------+
|Independent_predictors|Salary|
+----------------------+------+
|            [24.0,3.0]| 20000|
|            [29.0,4.0]| 20000|
|            [30.0,8.0]| 25000|
|           [31.0,10.0]| 30000|
+----------------------+------+



In [244]:
test_data.show()

+----------------------+------+
|Independent_predictors|Salary|
+----------------------+------+
|            [21.0,1.0]| 15000|
|            [23.0,2.0]| 18000|
+----------------------+------+



In [245]:
regressor = LinearRegression(featuresCol='Independent_predictors', labelCol='Salary')
regressor = regressor.fit(train_data)

21/12/26 00:33:12 WARN Instrumentation: [a2b430dc] regParam is zero, which might cause numerical instability and overfitting.
21/12/26 00:33:12 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
21/12/26 00:33:12 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
21/12/26 00:33:12 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [246]:
regressor.coefficients

DenseVector([-383.9733, 1711.1853])

In [247]:
regressor.intercept

23998.330550919385

In [252]:
pred_results = regressor.evaluate(test_data)

In [253]:
pred_results.predictions.show()

+----------------------+------+------------------+
|Independent_predictors|Salary|        prediction|
+----------------------+------+------------------+
|            [21.0,1.0]| 15000| 17646.07679465793|
|            [23.0,2.0]| 18000|18589.315525876566|
+----------------------+------+------------------+



In [254]:
pred_results.meanAbsoluteError

1617.6961602672473

In [255]:
pred_results.meanSquaredError

3674507.5961331767

### Decision Tree

In [314]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [315]:
df_spark = spark.read.csv('data/iris.csv', header=True, inferSchema=True)

In [316]:
df_spark.show()

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|variety|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| Setosa|
|         4.9|        3.0|         1.4|        0.2| Setosa|
|         4.7|        3.2|         1.3|        0.2| Setosa|
|         4.6|        3.1|         1.5|        0.2| Setosa|
|         5.0|        3.6|         1.4|        0.2| Setosa|
|         5.4|        3.9|         1.7|        0.4| Setosa|
|         4.6|        3.4|         1.4|        0.3| Setosa|
|         5.0|        3.4|         1.5|        0.2| Setosa|
|         4.4|        2.9|         1.4|        0.2| Setosa|
|         4.9|        3.1|         1.5|        0.1| Setosa|
|         5.4|        3.7|         1.5|        0.2| Setosa|
|         4.8|        3.4|         1.6|        0.2| Setosa|
|         4.8|        3.0|         1.4|        0.1| Setosa|
|         4.3|        3.0|         1.1| 

In [317]:
predictors = ['sepal_length','sepal_width','petal_length','petal_width']

In [318]:
df_spark = StringIndexer(inputCol="variety", outputCol="variety_int").fit(df_spark).transform(df_spark)
df_spark.show()

+------------+-----------+------------+-----------+-------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|variety|variety_int|
+------------+-----------+------------+-----------+-------+-----------+
|         5.1|        3.5|         1.4|        0.2| Setosa|        0.0|
|         4.9|        3.0|         1.4|        0.2| Setosa|        0.0|
|         4.7|        3.2|         1.3|        0.2| Setosa|        0.0|
|         4.6|        3.1|         1.5|        0.2| Setosa|        0.0|
|         5.0|        3.6|         1.4|        0.2| Setosa|        0.0|
|         5.4|        3.9|         1.7|        0.4| Setosa|        0.0|
|         4.6|        3.4|         1.4|        0.3| Setosa|        0.0|
|         5.0|        3.4|         1.5|        0.2| Setosa|        0.0|
|         4.4|        2.9|         1.4|        0.2| Setosa|        0.0|
|         4.9|        3.1|         1.5|        0.1| Setosa|        0.0|
|         5.4|        3.7|         1.5|        0.2| Setosa|     

In [319]:
from pyspark.ml.feature import VectorAssembler
featureAssembler = VectorAssembler(inputCols=predictors, outputCol='Independent_predictors')
df_spark = featureAssembler.transform(df_spark)
df_spark.show()

+------------+-----------+------------+-----------+-------+-----------+----------------------+
|sepal_length|sepal_width|petal_length|petal_width|variety|variety_int|Independent_predictors|
+------------+-----------+------------+-----------+-------+-----------+----------------------+
|         5.1|        3.5|         1.4|        0.2| Setosa|        0.0|     [5.1,3.5,1.4,0.2]|
|         4.9|        3.0|         1.4|        0.2| Setosa|        0.0|     [4.9,3.0,1.4,0.2]|
|         4.7|        3.2|         1.3|        0.2| Setosa|        0.0|     [4.7,3.2,1.3,0.2]|
|         4.6|        3.1|         1.5|        0.2| Setosa|        0.0|     [4.6,3.1,1.5,0.2]|
|         5.0|        3.6|         1.4|        0.2| Setosa|        0.0|     [5.0,3.6,1.4,0.2]|
|         5.4|        3.9|         1.7|        0.4| Setosa|        0.0|     [5.4,3.9,1.7,0.4]|
|         4.6|        3.4|         1.4|        0.3| Setosa|        0.0|     [4.6,3.4,1.4,0.3]|
|         5.0|        3.4|         1.5|        0.2

In [320]:
df_final = df_spark.select(['Independent_predictors', 'variety_int'])
df_final.show()

+----------------------+-----------+
|Independent_predictors|variety_int|
+----------------------+-----------+
|     [5.1,3.5,1.4,0.2]|        0.0|
|     [4.9,3.0,1.4,0.2]|        0.0|
|     [4.7,3.2,1.3,0.2]|        0.0|
|     [4.6,3.1,1.5,0.2]|        0.0|
|     [5.0,3.6,1.4,0.2]|        0.0|
|     [5.4,3.9,1.7,0.4]|        0.0|
|     [4.6,3.4,1.4,0.3]|        0.0|
|     [5.0,3.4,1.5,0.2]|        0.0|
|     [4.4,2.9,1.4,0.2]|        0.0|
|     [4.9,3.1,1.5,0.1]|        0.0|
|     [5.4,3.7,1.5,0.2]|        0.0|
|     [4.8,3.4,1.6,0.2]|        0.0|
|     [4.8,3.0,1.4,0.1]|        0.0|
|     [4.3,3.0,1.1,0.1]|        0.0|
|     [5.8,4.0,1.2,0.2]|        0.0|
|     [5.7,4.4,1.5,0.4]|        0.0|
|     [5.4,3.9,1.3,0.4]|        0.0|
|     [5.1,3.5,1.4,0.3]|        0.0|
|     [5.7,3.8,1.7,0.3]|        0.0|
|     [5.1,3.8,1.5,0.3]|        0.0|
+----------------------+-----------+
only showing top 20 rows



In [321]:
dtc = DecisionTreeClassifier(labelCol="variety_int", featuresCol="Independent_predictors")
dtc = dtc.fit(df_final)

In [322]:
df_final = dtc.transform(df_final)
df_final.show()

+----------------------+-----------+--------------+-------------+----------+
|Independent_predictors|variety_int| rawPrediction|  probability|prediction|
+----------------------+-----------+--------------+-------------+----------+
|     [5.1,3.5,1.4,0.2]|        0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|     [4.9,3.0,1.4,0.2]|        0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|     [4.7,3.2,1.3,0.2]|        0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|     [4.6,3.1,1.5,0.2]|        0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|     [5.0,3.6,1.4,0.2]|        0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|     [5.4,3.9,1.7,0.4]|        0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|     [4.6,3.4,1.4,0.3]|        0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|     [5.0,3.4,1.5,0.2]|        0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|     [4.4,2.9,1.4,0.2]|        0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|     [4.9,3.1,1.5,0.1]|        0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|

In [323]:
df_final = df_final.withColumnRenamed('variety_int', 'label')
df_final.show()

+----------------------+-----+--------------+-------------+----------+
|Independent_predictors|label| rawPrediction|  probability|prediction|
+----------------------+-----+--------------+-------------+----------+
|     [5.1,3.5,1.4,0.2]|  0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|     [4.9,3.0,1.4,0.2]|  0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|     [4.7,3.2,1.3,0.2]|  0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|     [4.6,3.1,1.5,0.2]|  0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|     [5.0,3.6,1.4,0.2]|  0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|     [5.4,3.9,1.7,0.4]|  0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|     [4.6,3.4,1.4,0.3]|  0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|     [5.0,3.4,1.5,0.2]|  0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|     [4.4,2.9,1.4,0.2]|  0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|     [4.9,3.1,1.5,0.1]|  0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|     [5.4,3.7,1.5,0.2]|  0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|     

In [324]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [325]:
acc_eval = MulticlassClassificationEvaluator(metricName='accuracy')

In [326]:
acc_eval.evaluate(df_final)

1.0

In [1]:
import pandas as pd

ModuleNotFoundError: No module named 'pandas'

In [2]:
!pip freeze

absl-py==1.0.0
aiofiles==0.7.0
aiohttp==3.7.4.post0
amqp==5.0.6
anyio==3.3.4
appdirs==1.4.4
appnope==0.1.0
arch==4.19
argon2-cffi==20.1.0
astroid==2.3.3
astunparse==1.6.3
async-timeout==3.0.1
attrs==20.2.0
Babel==2.9.1
backcall==0.1.0
backports.entry-points-selectable==1.1.1
beautifulsoup4==4.9.3
billiard==3.6.4.0
bleach==3.1.5
blinker==1.4
Brotli==1.0.9
cachetools==4.1.0
cairocffi==1.2.0
CairoSVG==2.4.2
celery==5.1.2
certifi==2020.11.8
cffi==1.14.3
chardet==3.0.4
click==7.1.2
click-didyoumean==0.0.3
click-plugins==1.1.1
click-repl==0.2.0
clikit==0.6.2
cloudpickle==1.6.0
cmdstanpy==0.9.68
convertdate==2.3.2
crashtest==0.3.1
cssselect==1.1.0
cssselect2==0.4.1
cycler==0.10.0
Cython==0.29.24
dash==1.17.0
dash-bootstrap-components==0.10.7
dash-core-components==1.13.0
dash-extensions==0.0.65
dash-html-components==1.1.1
dash-renderer==1.8.3
dash-table==4.11.0
dask==2021.4.1
debugpy==1.4.1
decorator==4.4.1
defusedxml==0.6.0
diazo==1.4.1
distlib==0.3.3
distributed==2021.4.1
Django==1.7.11
djan