# Wine pyspark

In [42]:
pip install pyspark



## Session builder

In [43]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('white_wine').getOrCreate()

## Import data

In [44]:
df = spark.read.csv('winequality-white.csv',inferSchema=True,header=True)

## Show and schema

In [45]:
df.show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.0|            0.27|       0.36|          20.7|    0.045|               45.0|               170.0|  1.001| 3.0|     0.45|    8.8|      6|
|          6.3|             0.3|       0.34|           1.6|    0.049|               14.0|               132.0|  0.994| 3.3|     0.49|    9.5|      6|
|          8.1|            0.28|        0.4|           6.9|     0.05|               30.0|                97.0| 0.9951|3.26|     0.44|   10.1|      6|
|          7.2|            0.23|       0.32|           8.5|    0.058|               47.0|           

In [46]:
df.printSchema()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



In [47]:
df.describe().show()

+-------+------------------+------------------+-------------------+-----------------+--------------------+-------------------+--------------------+--------------------+-------------------+-------------------+------------------+------------------+
|summary|     fixed acidity|  volatile acidity|        citric acid|   residual sugar|           chlorides|free sulfur dioxide|total sulfur dioxide|             density|                 pH|          sulphates|           alcohol|           quality|
+-------+------------------+------------------+-------------------+-----------------+--------------------+-------------------+--------------------+--------------------+-------------------+-------------------+------------------+------------------+
|  count|              4898|              4897|               4897|             4898|                4898|               4897|                4897|                4896|               4898|               4898|              4898|              4898|
|   mean| 6.

In [143]:
desc = df.describe()
desc = desc.select(desc['summary'],
                   f.format_number(desc['pH'].cast('float'),2).alias('pH'),
                   f.format_number(desc['density'].cast('float'),3).alias('density'),
                   f.format_number(desc['alcohol'].cast('float'),2).alias('alcohol')
                   )
desc.show()

+-------+--------+---------+--------+
|summary|      pH|  density| alcohol|
+-------+--------+---------+--------+
|  count|4,898.00|4,896.000|4,898.00|
|   mean|    3.19|    0.994|   10.51|
| stddev|    0.15|    0.003|    1.23|
|    min|    2.72|    0.987|    8.00|
|    max|    3.82|    1.039|   14.20|
+-------+--------+---------+--------+



## Add columns

In [48]:
df.withColumn('residual_sulphur',df["total sulfur dioxide"]-df["free sulfur dioxide"]).show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+----------------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|residual_sulphur|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+----------------+
|          7.0|            0.27|       0.36|          20.7|    0.045|               45.0|               170.0|  1.001| 3.0|     0.45|    8.8|      6|           125.0|
|          6.3|             0.3|       0.34|           1.6|    0.049|               14.0|               132.0|  0.994| 3.3|     0.49|    9.5|      6|           118.0|
|          8.1|            0.28|        0.4|           6.9|     0.05|               30.0|                97.0| 0.9951|3.26|     0.44|   10.1|      6|            67.0

## Filter

In [49]:
rows_acid = df.filter(df["fixed acidity"] > 10).collect()
rows_acid[0].asDict()

{'alcohol': 9.9,
 'chlorides': 0.049,
 'citric acid': 0.88,
 'density': 0.9968,
 'fixed acidity': 10.2,
 'free sulfur dioxide': 20.0,
 'pH': 2.99,
 'quality': 4,
 'residual sugar': 6.2,
 'sulphates': 0.51,
 'total sulfur dioxide': 124.0,
 'volatile acidity': 0.44}

## Aggregates and order

In [50]:
from pyspark.sql import functions as f
df.groupby('quality').agg(
    f.avg('alcohol').alias('mean_alcohol'),
    f.max('alcohol').alias('max_alcohol'),
    f.min('alcohol').alias('min_alcohol'),
    f.count('alcohol').alias('population')
).sort('quality').show()

+-------+------------------+-----------+-----------+----------+
|quality|      mean_alcohol|max_alcohol|min_alcohol|population|
+-------+------------------+-----------+-----------+----------+
|      3|            10.345|       12.6|        8.0|        20|
|      4| 10.15245398773007|       13.5|        8.4|       163|
|      5| 9.808840082361009|       13.6|        8.0|      1457|
|      6|10.575371549893866|       14.0|        8.5|      2198|
|      7|11.367935606060604|       14.2|        8.6|       880|
|      8|11.636000000000003|       14.0|        8.5|       175|
|      9|             12.18|       12.9|       10.4|         5|
+-------+------------------+-----------+-----------+----------+



In [51]:
df.createOrReplaceTempView('wine')
query = spark.sql("SELECT * FROM wine ORDER BY alcohol LIMIT 5")
query.show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          4.2|           0.215|       0.23|           5.1|    0.041|               64.0|               157.0|0.99688|3.42|     0.44|    8.0|      3|
|          4.5|            0.19|       0.21|          0.95|    0.033|               89.0|               159.0|0.99332|3.34|     0.42|    8.0|      5|
|          6.2|            0.31|       0.23|           3.3|    0.052|               34.0|               113.0|0.99429|3.16|     0.48|    8.4|      5|
|          6.2|            0.31|       0.23|           3.3|    0.052|               34.0|           

In [52]:
df.select(f.countDistinct('quality').alias('unique_qualities')).show()

+----------------+
|unique_qualities|
+----------------+
|               7|
+----------------+



In [144]:
df.select([f.max(df.pH),f.min(df.pH)]).show()

+-------+-------+
|max(pH)|min(pH)|
+-------+-------+
|   3.82|   2.72|
+-------+-------+



In [53]:
ph_stddev = df.select(f.stddev('pH').alias('pH_stddev'))
ph_stddev.show()

+-------------------+
|          pH_stddev|
+-------------------+
|0.15100059961506673|
+-------------------+



In [54]:
ph_stddev.select(f.format_number('pH_stddev',3).alias('pH_stddev_rounded')).show()

+-----------------+
|pH_stddev_rounded|
+-----------------+
|            0.151|
+-----------------+



In [55]:
df.orderBy(df.pH.desc()).show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          5.3|            0.26|       0.23|          5.15|    0.034|               48.0|               160.0| 0.9952|3.82|     0.51|   10.5|      7|
|          6.4|            0.22|       0.34|           1.8|    0.057|               29.0|               104.0| 0.9959|3.81|     0.57|   10.3|      6|
|          6.3|             0.2|       0.24|           1.7|    0.052|               36.0|               135.0|0.99374| 3.8|     0.66|   10.8|      6|
|          5.7|            0.27|       0.32|           1.2|    0.046|               20.0|           

## Correlation

In [145]:
df.select(f.corr('pH','alcohol')).show()

+-------------------+
|  corr(pH, alcohol)|
+-------------------+
|0.12143209874913044|
+-------------------+



## Missing data

In [57]:
from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+---+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density| pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+---+---------+-------+-------+
|            0|               1|          1|             0|        0|                  1|                   1|      2|  0|        0|      0|      0|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+---+---------+-------+-------+



In [75]:
df_with_miss = df.filter((df.density.isNull()) | (df['citric acid'].isNull()) | (df.pH == 3.82))
df_with_miss.show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          6.2|            0.16|       0.33|           1.1|    0.057|               21.0|                82.0|   null|3.32|     0.46|   10.9|      7|
|          7.1|            0.44|       null|          11.8|    0.044|               52.0|               152.0| 0.9975|3.12|     0.46|    8.7|      6|
|          5.3|            0.26|       0.23|          5.15|    0.034|               48.0|               160.0| 0.9952|3.82|     0.51|   10.5|      7|
|          7.2|            null|        0.4|           6.3|    0.047|               null|           

In [76]:
df_with_miss.na.drop().show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          5.3|            0.26|       0.23|          5.15|    0.034|               48.0|               160.0| 0.9952|3.82|     0.51|   10.5|      7|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+



In [80]:
df_with_miss.na.drop(subset=['density']).show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.1|            0.44|       null|          11.8|    0.044|               52.0|               152.0| 0.9975|3.12|     0.46|    8.7|      6|
|          5.3|            0.26|       0.23|          5.15|    0.034|               48.0|               160.0| 0.9952|3.82|     0.51|   10.5|      7|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+



In [81]:
df_with_miss.na.fill(0).show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          6.2|            0.16|       0.33|           1.1|    0.057|               21.0|                82.0|    0.0|3.32|     0.46|   10.9|      7|
|          7.1|            0.44|        0.0|          11.8|    0.044|               52.0|               152.0| 0.9975|3.12|     0.46|    8.7|      6|
|          5.3|            0.26|       0.23|          5.15|    0.034|               48.0|               160.0| 0.9952|3.82|     0.51|   10.5|      7|
|          7.2|             0.0|        0.4|           6.3|    0.047|                0.0|           

In [86]:
avg_density = df.select(f.mean(df.density)).collect()
avg_density = avg_density[0][0]
df_with_miss.na.fill(avg_density,subset=['density']).show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+------------------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|           density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+------------------+----+---------+-------+-------+
|          6.2|            0.16|       0.33|           1.1|    0.057|               21.0|                82.0|0.9940276470588172|3.32|     0.46|   10.9|      7|
|          7.1|            0.44|       null|          11.8|    0.044|               52.0|               152.0|            0.9975|3.12|     0.46|    8.7|      6|
|          5.3|            0.26|       0.23|          5.15|    0.034|               48.0|               160.0|            0.9952|3.82|     0.51|   10.5|      7|
|          7.2|            null|  

## Datetime

In [123]:
df_apple = spark.read.csv('appl_stock.csv',header=True,inferSchema=True)
df_apple.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [124]:
from datetime import datetime
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType

convert_date = udf(lambda x: datetime.strptime(x, '%Y-%m-%d'), DateType())
df_apple = df_apple.withColumn('Date', convert_date('Date'))

In [125]:
df_apple.head(1)

[Row(Date=datetime.date(2010, 1, 4), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)]

In [126]:
df_apple = df_apple.withColumn('week',f.weekofyear('Date'))
df_apple.show(5)

+----------+----------+----------+------------------+------------------+---------+------------------+----+
|      Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|week|
+----------+----------+----------+------------------+------------------+---------+------------------+----+
|2010-01-04|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|   1|
|2010-01-05|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|   1|
|2010-01-06|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|   1|
|2010-01-07|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|   1|
|2010-01-08|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|   1|
+----------+----------+----------+------------------+------------------+---------+------------------+----+
only showing top 5 rows



In [148]:
df_apple.groupby('week').max().select(['week','max(Open)','max(Close)']).orderBy('week').show(5)

+----+----------+-----------------+
|week| max(Open)|       max(Close)|
+----+----------+-----------------+
|   1|557.460022|       561.019997|
|   2|546.800018|       543.929993|
|   3|554.900017|557.3599929999999|
|   4|554.000023|       556.179993|
|   5|    550.07|       550.500023|
+----+----------+-----------------+
only showing top 5 rows

