In [1]:
# import e creazione sessione
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MacagnoProject").getOrCreate()

In [3]:
# carico dati
import pandas as pd

df = spark.read.load("../data/Pisa_Operazioni_2015.csv", format="csv", header=True, inferSchema=True)

pd.DataFrame(df.take(5), columns=df.columns).transpose()

Unnamed: 0,0,1,2,3,4
Comune,Pisa,Pisa,Pisa,Pisa,Pisa
StazPrelievo,Pratale,Teatro Tribunale,Stazione F.S.,Porta a Lucca,Pratale
DataOraPrelievo,2015-12-31 21:35:40.987000,2015-12-31 19:44:21.223000,2015-12-31 18:42:29.193000,2015-12-31 18:24:42.163000,2015-12-31 16:43:48.687000
ColonninaPrelievo,14,7,27,6,13
StazDeposito,Stazione F.S.,Comune Palazzo Blu,Ospedale Cisanello,Vittorio Emanuele,Vittorio Emanuele
ColonninaDeposito,27,13,10,4,5
DataOraDeposito,2015-12-31 21:49:59.217000,2015-12-31 19:47:08.107000,2015-12-31 18:55:41.307000,2015-12-31 18:34:57.313000,2015-12-31 17:01:59.063000
IDBadge,126270,156631,188285,194690,156604
NumeroBadge,1775,2710,444,3972,2721


In [11]:
df.printSchema()

root
 |-- Comune: string (nullable = true)
 |-- StazPrelievo: string (nullable = true)
 |-- DataOraPrelievo: timestamp (nullable = true)
 |-- ColonninaPrelievo: integer (nullable = true)
 |-- StazDeposito: string (nullable = true)
 |-- ColonninaDeposito: integer (nullable = true)
 |-- DataOraDeposito: timestamp (nullable = true)
 |-- IDBadge: integer (nullable = true)
 |-- NumeroBadge: integer (nullable = true)



In [4]:
# udf per il preprocessing 

from dateutil.parser import parse
from pyspark.sql.functions import udf

def day_range(time):
    # 0 night 6 morning 12 afternoon 18 evening
    t = time.hour
    if t < 6:
        return 0
    elif t < 12:
        return 600
    elif t < 18:
        return 1200
    else:
        return 1800

day_range_udf = udf(day_range)
# maturity_udf = udf(return_age_bracket)

In [5]:
# day range e cast a Date di day

from pyspark.sql.types import *

df = df.withColumn("day_range", day_range_udf(df.DataOraPrelievo).cast("int"))
df = df.withColumnRenamed("DataOraPrelievo", "day")
df = df.withColumn("day", df.day.cast(DateType()))
# df.sort("day").show()

In [6]:
pd.DataFrame(df.sort("day").take(5), columns=df.columns).transpose()

Unnamed: 0,0,1,2,3,4
Comune,Pisa,Pisa,Pisa,Pisa,Pisa
StazPrelievo,Comune Palazzo Blu,Duomo,Vittorio Emanuele,Borgo Stretto,Porta a Lucca
day,2015-01-01,2015-01-01,2015-01-01,2015-01-01,2015-01-01
ColonninaPrelievo,4,10,8,1,7
StazDeposito,Porta a Lucca,Borgo Stretto,Porta a Lucca,Porta a Lucca,Comune Palazzo Blu
ColonninaDeposito,7,8,4,6,4
DataOraDeposito,2015-01-02 00:02:48.357000,2015-01-01 18:28:14.667000,2015-01-01 23:57:08.983000,2015-01-01 19:25:38.553000,2015-01-01 18:29:03.073000
IDBadge,164407,130050,151407,150809,164407
NumeroBadge,3419,1836,2406,2090,3419
day_range,1800,1800,1800,1800,1800


In [8]:
# leggi weather.csv
weather = spark.read.load("../data/calculated/weather_with_holidays.csv", format="csv", header=True, inferSchema=True)
weather.sort("day").show()
weather.printSchema()

+-------------------+---------+-------------+-----------+-----------+------+
|                day|day_range|      weather|day_of_week|celebration|season|
+-------------------+---------+-------------+-----------+-----------+------+
|2015-01-01 00:00:00|     1200|        Sunny|          5|          1|Winter|
|2015-01-01 00:00:00|     1800|        Sunny|          5|          1|Winter|
|2015-01-01 00:00:00|        0|        Clear|          5|          1|Winter|
|2015-01-01 00:00:00|      600|        Clear|          5|          1|Winter|
|2015-01-02 00:00:00|        0|        Clear|          6|          0|Winter|
|2015-01-02 00:00:00|      600|        Clear|          6|          0|Winter|
|2015-01-02 00:00:00|     1200|        Sunny|          6|          0|Winter|
|2015-01-02 00:00:00|     1800|        Sunny|          6|          0|Winter|
|2015-01-03 00:00:00|     1800|        Sunny|          7|          0|Winter|
|2015-01-03 00:00:00|      600|         Mist|          7|          0|Winter|

In [9]:
from pyspark.sql.functions import *

weather = weather.withColumn("day", weather.day.cast(DateType()))
weather = weather.withColumn("month", month('day').alias('month'))
weather = weather.withColumn("day_of_week", dayofweek('day').alias('day_of_week'))
weather = weather.withColumn("day_of_year", dayofmonth('day').alias('day_of_year'))
weather = weather.withColumn("celebration", weather.celebration.cast(BooleanType()))

In [10]:
pd.DataFrame(weather.sort(["day", "day_range"]).take(5), columns=weather.columns).transpose()

Unnamed: 0,0,1,2,3,4
day,2015-01-01,2015-01-01,2015-01-01,2015-01-01,2015-01-02
day_range,0,600,1200,1800,0
weather,Clear,Clear,Sunny,Sunny,Clear
day_of_week,5,5,5,5,6
celebration,True,True,True,True,False
season,Winter,Winter,Winter,Winter,Winter
month,1,1,1,1,1
day_of_year,1,1,1,1,2


In [11]:
# raggruppamento di pisa operazioni
df = df.groupBy(["day", "day_range"]).count()

In [12]:
pd.DataFrame(df.sort(["day", "day_range"]).take(5), columns=df.columns).transpose()

Unnamed: 0,0,1,2,3,4
day,2015-01-01,2015-01-01,2015-01-01,2015-01-01,2015-01-02
day_range,0,600,1200,1800,0
count,5,7,20,5,4


In [13]:
# tempo di JOIN

inner_join = df.join(weather, ["day", "day_range"])
inner_join.show()

+----------+---------+-----+--------------------+-----------+-----------+------+-----+-----------+
|       day|day_range|count|             weather|day_of_week|celebration|season|month|day_of_year|
+----------+---------+-----+--------------------+-----------+-----------+------+-----+-----------+
|2015-12-10|     1200|  255|               Sunny|          5|      false|Autumn|   12|         10|
|2015-11-24|        0|    3|       Partly cloudy|          3|      false|Autumn|   11|         24|
|2015-10-19|     1200|  361|               Sunny|          2|      false|Autumn|   10|         19|
|2015-10-02|      600|  218|   Light rain shower|          6|      false|Autumn|   10|          2|
|2015-09-12|     1200|   76|       Partly cloudy|          7|      false|Summer|    9|         12|
|2015-08-25|     1200|   84|               Sunny|          3|      false|Summer|    8|         25|
|2015-07-28|        0|   13|               Clear|          3|      false|Summer|    7|         28|
|2015-07-0

In [14]:
from pyspark.sql.types import *

# discretizing total_count 
from pyspark.ml.feature import QuantileDiscretizer

discretizer = QuantileDiscretizer(numBuckets=3, inputCol="count", outputCol="count_discretized")

discretized = discretizer.fit(inner_join).transform(inner_join)
# discretized_new = discretized.withColumn("count_discretized", discretized["count_discretized"].cast(IntegerType()))
discretized.show()
discretized.printSchema()

+----------+---------+-----+--------------------+-----------+-----------+------+-----+-----------+-----------------+
|       day|day_range|count|             weather|day_of_week|celebration|season|month|day_of_year|count_discretized|
+----------+---------+-----+--------------------+-----------+-----------+------+-----+-----------+-----------------+
|2015-12-10|     1200|  255|               Sunny|          5|      false|Autumn|   12|         10|              2.0|
|2015-11-24|        0|    3|       Partly cloudy|          3|      false|Autumn|   11|         24|              0.0|
|2015-10-19|     1200|  361|               Sunny|          2|      false|Autumn|   10|         19|              2.0|
|2015-10-02|      600|  218|   Light rain shower|          6|      false|Autumn|   10|          2|              2.0|
|2015-09-12|     1200|   76|       Partly cloudy|          7|      false|Summer|    9|         12|              1.0|
|2015-08-25|     1200|   84|               Sunny|          3|   

In [16]:
final = discretized.withColumnRenamed("count", "total_count")
final.show()
final.toPandas().to_csv('../data/calculated/final.csv')
final.printSchema()

+----------+---------+-----------+--------------------+-----------+-----------+------+-----+-----------+-----------------+
|       day|day_range|total_count|             weather|day_of_week|celebration|season|month|day_of_year|count_discretized|
+----------+---------+-----------+--------------------+-----------+-----------+------+-----+-----------+-----------------+
|2015-12-10|     1200|        255|               Sunny|          5|      false|Autumn|   12|         10|              2.0|
|2015-11-24|        0|          3|       Partly cloudy|          3|      false|Autumn|   11|         24|              0.0|
|2015-10-19|     1200|        361|               Sunny|          2|      false|Autumn|   10|         19|              2.0|
|2015-10-02|      600|        218|   Light rain shower|          6|      false|Autumn|   10|          2|              2.0|
|2015-09-12|     1200|         76|       Partly cloudy|          7|      false|Summer|    9|         12|              1.0|
|2015-08-25|    

In [17]:
print(df.count())
print(weather.count())
print(inner_join.count())
print(final.count())

1458
1460
1458
1458


# FINE
Qui sotto magari qualche statistica