# 1. Introduction

The main purpose of this notebook is to compare ways to open and process standard .csv files on your local machine.

We have 2 documents which we will open, inspect and transform with three different methods available in Python:

- [__Pandas__](https://pandas.pydata.org/),
- [__Dask__](https://dask.org/),
- [__PySpark__](https://spark.apache.org/).



So initially, let's import basic libraries:

In [1]:
from os import path
import pandas as pd

Now let's see the volume of out data:

In [2]:
medium_file = path.expanduser("./data/raw/Crimes.csv")
big_file = path.expanduser("./data/raw/Vehicles.csv")

file_list = {"Crimes" : medium_file, 
             "Vehicles" : big_file,
            }

for  key, value in file_list.items():
    print(f"{key} file size: {path.getsize(value) >> 20:.2f} MB")

Crimes file size: 1725.00 MB
Vehicles file size: 4814.00 MB


## 2. Pandas

We will start with loading files into DataFrames with __Pandas__ -><br>
an open source library providing high-performance, easy-to-use data structures and data analytics tool for the Python programming language.

#### 2.1. Medium DataFrame

##### Read data:

In [3]:
%%time
medium_df = pd.read_csv(medium_file)

Wall time: 43.4 s


##### Show last 2 rows:

In [4]:
%%time
medium_df.tail(2)

Wall time: 0 ns


Unnamed: 0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,...,Longitude,Location,Historical Wards 2003-2015,Zip Codes,Community Areas,Census Tracts,Wards,Boundaries - ZIP Codes,Police Districts,Police Beats
6877120,1392468,G106773,01/01/2001 12:00:00 AM,023XX W HURON ST,1210,DECEPTIVE PRACTICE,THEFT OF LABOR/SERVICES,RESIDENCE,False,False,...,-87.686272,"(41.893824874, -87.686272003)",24.0,21184.0,25.0,546.0,41.0,28.0,15.0,80.0
6877121,1310313,G000070,01/01/2001 12:00:00 AM,109XX S LONGWOOD DR,1310,CRIMINAL DAMAGE,TO PROPERTY,APARTMENT,False,False,...,-87.671448,"(41.694977232, -87.671447845)",33.0,22212.0,74.0,380.0,42.0,13.0,9.0,257.0


##### Number of rows, columns:

In [5]:
%%time
medium_df.shape

Wall time: 0 ns


(6877122, 30)

##### Basic statistics:

In [6]:
%%time
medium_df.describe()

Wall time: 8.47 s


Unnamed: 0,ID,Beat,District,Ward,Community Area,X Coordinate,Y Coordinate,Year,Latitude,Longitude,Historical Wards 2003-2015,Zip Codes,Community Areas,Census Tracts,Wards,Boundaries - ZIP Codes,Police Districts,Police Beats
count,6877122.0,6877122.0,6877075.0,6262298.0,6263626.0,6811932.0,6811932.0,6877122.0,6811932.0,6811932.0,6792140.0,6811932.0,6794854.0,6797012.0,6794966.0,6794901.0,6795930.0,6795953.0
mean,6326030.0,1190.924,11.30045,22.69056,37.57498,1164523.0,1885727.0,2008.515,41.84203,-87.67178,27.38247,19108.71,38.7246,381.3732,25.53626,31.46023,14.91913,150.5174
std,3097822.0,703.2927,6.945665,13.83333,21.53849,17155.58,32686.63,5.157344,0.08994461,0.06208706,15.26851,5736.594,20.09641,230.0487,14.77743,19.14224,6.45364,78.50072
min,634.0,111.0,1.0,1.0,0.0,0.0,0.0,2001.0,36.61945,-91.68657,1.0,2733.0,1.0,1.0,1.0,1.0,1.0,1.0
25%,3467769.0,622.0,6.0,10.0,23.0,1152936.0,1859189.0,2004.0,41.76891,-87.71385,14.0,21184.0,25.0,176.0,12.0,15.0,10.0,83.0
50%,6310243.0,1111.0,10.0,22.0,32.0,1166001.0,1890614.0,2008.0,41.85551,-87.66615,27.0,21560.0,37.0,378.0,26.0,30.0,16.0,153.0
75%,9000136.0,1731.0,17.0,34.0,57.0,1176352.0,1909288.0,2013.0,41.90682,-87.62835,41.0,22243.0,58.0,577.0,37.0,50.0,20.0,221.0
max,11701390.0,2535.0,31.0,50.0,77.0,1205119.0,1951622.0,2019.0,42.02291,-87.52453,53.0,26912.0,77.0,801.0,50.0,61.0,25.0,277.0


##### Conditinal statements:

In [7]:
%%time
medium_df.loc[(medium_df["Community Areas"] > 50) & (medium_df["Boundaries - ZIP Codes"] < 60) & (medium_df["Description"] == "AUTOMOBILE")].head(2)

Wall time: 692 ms


Unnamed: 0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,...,Longitude,Location,Historical Wards 2003-2015,Zip Codes,Community Areas,Census Tracts,Wards,Boundaries - ZIP Codes,Police Districts,Police Beats
214,11694283,JC271784,05/20/2019 06:00:00 PM,009XX W DIVERSEY PKWY,910,MOTOR VEHICLE THEFT,AUTOMOBILE,RESIDENCE,False,False,...,-87.651911,"(41.932671001, -87.651910528)",38.0,21190.0,57.0,681.0,25.0,22.0,5.0,31.0
651,11693300,JC270665,05/20/2019 04:00:00 AM,071XX S GREEN ST,910,MOTOR VEHICLE THEFT,AUTOMOBILE,STREET,False,False,...,-87.645607,"(41.764525607, -87.645606586)",31.0,21559.0,66.0,512.0,32.0,11.0,17.0,215.0


##### Aggregations:

In [8]:
%%time
medium_df.groupby("Police Districts").agg("count").head(5)

Wall time: 5.27 s


Unnamed: 0_level_0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,...,Latitude,Longitude,Location,Historical Wards 2003-2015,Zip Codes,Community Areas,Census Tracts,Wards,Boundaries - ZIP Codes,Police Beats
Police Districts,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1.0,197490,197490,197490,197490,197490,197490,197490,197369,197490,197490,...,197490,197490,197490,197489,197490,197489,197453,197489,197489,197490
2.0,119541,119540,119541,119541,119541,119541,119541,119393,119541,119541,...,119541,119541,119541,119541,119541,119541,119541,119541,119541,119541
3.0,41,41,41,41,41,41,41,41,41,41,...,41,41,41,41,41,0,41,2,41,41
4.0,1453,1453,1453,1453,1453,1453,1453,1453,1453,1453,...,1453,1453,1453,1453,1453,0,505,0,6,1453
5.0,304851,304851,304851,304851,304851,304851,304851,304326,304851,304851,...,304851,304851,304851,304851,304851,304851,304851,304851,304851,304851


We need to clear memory space from all the variables to prevent us from out of memory error.<br>
Then import requred libraries and create variables once more.

In [9]:
%reset -f

#### 2.2. Big DataFrame:

In the largest dataset we will have to turn __low_memory__ to __False__ to proceed with the operation<br>
(DtypeWarning associated with column types would arrise).

In [10]:
from os import path
import pandas as pd

big_file = path.expanduser("./data/raw/Vehicles.csv")

##### Read data:

In [11]:
%%time
big_df = pd.read_csv(big_file,
                     low_memory=False)

Wall time: 9min 18s


It looks like our local computer can handle the big files > 4GB, but it's quite time consuming taks comparing to processing<br>
and it looks like it is taking the most of time in our comparison.

##### Show last 2 rows:

In [12]:
%%time
big_df.tail(2)

Wall time: 8.97 ms


Unnamed: 0.1,Unnamed: 0,pojazd_id,marka,kategoria,typ,model,wariant,wersja,rodzaj,podrodzaj,...,siedziba_wlasciciela_woj,siedziba_wlasciciela_pow,siedziba_wlasciciela_gmina,data_pierwszej_rej_w_kraju,createtimestamp,modifytimestamp,siedziba_wlasciciela_woj_teryt,akt_miejsce_rej_wojew_teryt,emisja_co2,emisja_co2_pal_alternatywne1
13063685,6753092,80800114204160,CITROEN,,,XANTIA,,,SAMOCHÓD OSOBOWY,KOMBI,...,MAZOWIECKIE,LEGIONOWSKI,JABŁONNA,1999-07-28,2019-03-15,2019-03-15,14.0,14,,
13063686,6753093,32820186672840,TOYOTA,13.0,,YARIS,,,SAMOCHÓD OSOBOWY,WIELOZADANIOWY,...,MAZOWIECKIE,WARSZAWSKI ZACHODNI,STARE BABICE,2013-10-29,2019-03-15,2019-03-15,14.0,14,104.0,


##### Number of rows, columns:

In [13]:
%%time
big_df.shape

Wall time: 0 ns


(13063687, 72)

##### Basic statistics:

In [14]:
%%time
big_df.describe()

Wall time: 31.7 s


Unnamed: 0.1,Unnamed: 0,pojazd_id,kategoria,pojemnosc_silnika,moc_do_masy,moc_silnika,moc_silnika_hybrydowego,masa_wlasna,masa_pgj,dopuszczalna_masa_calkowita,...,rozstaw_kol_max,rozstaw_kol_sred,rozstaw_kol_min,emisja_co2_redukcja,wersja_rpp,kod_rpp,siedziba_wlasciciela_woj_teryt,akt_miejsce_rej_wojew_teryt,emisja_co2,emisja_co2_pal_alternatywne1
count,13063690.0,13063690.0,4276366.0,12074010.0,1128423.0,9831770.0,12678.0,12516820.0,24215.0,12525750.0,...,1365783.0,1365972.0,1366715.0,24.0,7951085.0,7951085.0,9811910.0,13063690.0,1896983.0,941.0
mean,2514995.0,365230900000000.0,12700040000000.0,1912.105,0.05682439,70.77183,57.085567,1434.54,2145.469131,2809.068,...,1380.414,1549.918,1387.077,15.458333,6.874341,114585.1,12.70523,12.59851,135.7897,138.123273
std,1784089.0,1335397000000000.0,356144800000000.0,1941.93,1.198505,68.7427,34.860855,1855.161,3409.945596,5280.302,...,3644.857,22988.55,7568.882,34.118692,3.533673,239364.7,2.371407,1.606318,102.6126,16.823227
min,0.0,2233634.0,0.0,0.0,0.0,0.0,0.9,0.0,1.0,0.0,...,0.0,0.0,0.0,4.0,1.0,1.0,0.0,10.0,0.0,96.0
25%,1088640.0,26899650000000.0,13.0,1229.0,0.0,40.0,45.0,770.0,1205.0,1336.0,...,1478.0,1467.0,1468.0,5.0,1.0,3.0,12.0,12.0,118.0,131.0
50%,2177281.0,53777460000000.0,13.0,1596.0,0.0142,66.0,53.0,1159.0,1445.0,1690.0,...,1536.0,1529.0,1525.0,5.0,9.0,3.0,14.0,14.0,135.0,132.0
75%,3487172.0,80643250000000.0,11111110.0,1994.0,0.0387,92.0,60.0,1500.0,1885.0,2105.0,...,1574.0,1567.0,1562.0,6.0,9.0,201000.0,14.0,14.0,154.0,155.0
max,6753093.0,9007160000000000.0,1e+16,99973.0,890.0,12902.0,999.0,911940.0,105535.0,920920.0,...,2589258.0,8961559.0,5025828.0,132.0,9.0,1699000.0,73.0,14.0,121067.0,163.0


##### Conditinal statements:

In [15]:
%%time
big_df.loc[((big_df["marka"] == "TOYOTA") | (big_df["marka"] == "FORD")) &  (big_df["emisja_co2"] < 100)].head(2)

Wall time: 2.24 s


Unnamed: 0.1,Unnamed: 0,pojazd_id,marka,kategoria,typ,model,wariant,wersja,rodzaj,podrodzaj,...,siedziba_wlasciciela_woj,siedziba_wlasciciela_pow,siedziba_wlasciciela_gmina,data_pierwszej_rej_w_kraju,createtimestamp,modifytimestamp,siedziba_wlasciciela_woj_teryt,akt_miejsce_rej_wojew_teryt,emisja_co2,emisja_co2_pal_alternatywne1
868,868,51955064994346,TOYOTA,13.0,XW3(A),PRIUS,ZVW30(H),ZVW30L-AHXEBW(2C),SAMOCHÓD OSOBOWY,HATCHBACK,...,ŁÓDZKIE,ŁÓDŹ,ŁÓDŹ,2012-01-13,2019-03-15,2019-03-15,10.0,10,89.0,
1620,1620,4125588963370607,TOYOTA,13.0,XP13M(A),YARIS,KSP13(MH),KSP130L-CHMRKW(3L),SAMOCHÓD OSOBOWY,WIELOZADANIOWY,...,,,,2018-05-29,2019-03-15,2019-03-15,,10,99.0,


##### Aggregations:

In [16]:
%%time
big_df.groupby("rodzaj").agg("count").head(5)

Wall time: 44.8 s


Unnamed: 0_level_0,Unnamed: 0,pojazd_id,marka,kategoria,typ,model,wariant,wersja,podrodzaj,przeznaczenie,...,siedziba_wlasciciela_woj,siedziba_wlasciciela_pow,siedziba_wlasciciela_gmina,data_pierwszej_rej_w_kraju,createtimestamp,modifytimestamp,siedziba_wlasciciela_woj_teryt,akt_miejsce_rej_wojew_teryt,emisja_co2,emisja_co2_pal_alternatywne1
rodzaj,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
AUTOBUS,52082,52082,51424,15830,29143,50963,29158,29158,52027,49195,...,38182,38499,38345,52014,52082,52082,38182,52082,1586,0
AUTOBUSY,3,3,3,0,0,3,0,0,3,0,...,3,3,3,1,3,3,3,3,0,0
BRAK DANYCH,3,3,3,1,0,1,0,0,3,3,...,3,3,3,3,3,3,3,3,0,0
CIAGN. ROLNICZY.,1,1,1,0,0,1,0,0,1,0,...,1,1,1,1,1,1,1,1,0,0
CIAGN. SAMOCHOD.,2,2,2,0,0,2,0,0,2,0,...,2,2,2,2,2,2,2,2,0,0


In [17]:
%reset -f

# 3. Dask

Now we will do the same operations using Dask ->  a library for parallel computing in Python.

#### 3.1. Medium DataFrame:

In [18]:
from os import path
import dask.dataframe as dd

medium_file = path.expanduser("./data/raw/Crimes.csv")

##### Read data:

In [19]:
%%time
medium_dd = dd.read_csv(medium_file,
                        assume_missing=True,
                        )

Wall time: 67.8 ms


Creating the DataFrames went a lot faster than expected that may be surprising, however we have to remember that Dask works based<br>
on principle of __lazy evaluation__ thus we should expect that -> to compute the value of a function, we have to use __.compute()__ method.<br>
It will compute the results parallely in blocks, parallelizing every independent task at that time.

Still let's check the timing of the basic tasks we already defined above for our DataFrames.

##### Show last 2 rows:

In [20]:
%%time
medium_dd.tail(2)

Wall time: 527 ms


Unnamed: 0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,...,Longitude,Location,Historical Wards 2003-2015,Zip Codes,Community Areas,Census Tracts,Wards,Boundaries - ZIP Codes,Police Districts,Police Beats
67808,1392468.0,G106773,01/01/2001 12:00:00 AM,023XX W HURON ST,1210,DECEPTIVE PRACTICE,THEFT OF LABOR/SERVICES,RESIDENCE,False,False,...,-87.686272,"(41.893824874, -87.686272003)",24.0,21184.0,25.0,546.0,41.0,28.0,15.0,80.0
67809,1310313.0,G000070,01/01/2001 12:00:00 AM,109XX S LONGWOOD DR,1310,CRIMINAL DAMAGE,TO PROPERTY,APARTMENT,False,False,...,-87.671448,"(41.694977232, -87.671447845)",33.0,22212.0,74.0,380.0,42.0,13.0,9.0,257.0


##### Number of rows, columns:

In [21]:
%%time
print(f"shape: ({len(medium_dd)}, {len(medium_dd.columns)})")

shape: (6877122, 30)
Wall time: 25.8 s


##### Basic statistics:

In [22]:
%%time
medium_dd.describe().compute()

Wall time: 1min 36s


Unnamed: 0,ID,Arrest,Domestic,Beat,District,Ward,Community Area,X Coordinate,Y Coordinate,Year,Latitude,Longitude,Historical Wards 2003-2015,Zip Codes,Community Areas,Census Tracts,Wards,Boundaries - ZIP Codes,Police Districts,Police Beats
count,6877122.0,6877122.0,6877122.0,6877122.0,6877075.0,6262298.0,6263626.0,6811932.0,6811932.0,6877122.0,6811932.0,6811932.0,6792140.0,6811932.0,6794854.0,6797012.0,6794966.0,6794901.0,6795930.0,6795953.0
mean,6326030.0,0.2762272,0.13186,1190.924,11.30045,22.69056,37.57498,1164523.0,1885727.0,2008.515,41.84203,-87.67178,27.38247,19108.71,38.7246,381.3732,25.53626,31.46023,14.91913,150.5174
std,3097822.0,0.4471306,0.3383385,703.2927,6.945665,13.83333,21.53849,17155.58,32686.63,5.157344,0.08994461,0.06208706,15.26851,5736.594,20.09641,230.0487,14.77743,19.14224,6.45364,78.50072
min,634.0,0.0,0.0,111.0,1.0,1.0,0.0,0.0,0.0,2001.0,36.61945,-91.68657,1.0,2733.0,1.0,1.0,1.0,1.0,1.0,1.0
25%,4274221.0,0.0,0.0,631.0,6.0,15.0,26.0,1153612.0,1861556.0,2005.0,41.77552,-87.7113,15.0,21186.0,25.0,184.0,13.0,16.0,10.0,85.0
50%,7994002.0,0.0,0.0,1123.0,10.0,24.0,52.0,1166922.0,1894366.0,2011.0,41.86585,-87.66321,28.0,21569.0,37.0,382.0,27.0,30.0,16.0,157.0
75%,11604070.0,1.0,0.0,1823.0,17.0,35.0,76.0,1176547.0,1910536.0,2019.0,41.91032,-87.62772,41.0,22248.0,59.0,580.0,39.0,52.0,20.0,224.0
max,11701390.0,1.0,1.0,2535.0,31.0,50.0,77.0,1205119.0,1951622.0,2019.0,42.02291,-87.52453,53.0,26912.0,77.0,801.0,50.0,61.0,25.0,277.0


##### Conditinal statements:

In [23]:
%%time
medium_dd.loc[(medium_dd["Community Areas"] > 50) & (medium_dd["Boundaries - ZIP Codes"] < 60) & (medium_dd["Description"] == "AUTOMOBILE")].head(2)

Wall time: 1.6 s


Unnamed: 0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,...,Longitude,Location,Historical Wards 2003-2015,Zip Codes,Community Areas,Census Tracts,Wards,Boundaries - ZIP Codes,Police Districts,Police Beats
214,11694283.0,JC271784,05/20/2019 06:00:00 PM,009XX W DIVERSEY PKWY,910,MOTOR VEHICLE THEFT,AUTOMOBILE,RESIDENCE,False,False,...,-87.651911,"(41.932671001, -87.651910528)",38.0,21190.0,57.0,681.0,25.0,22.0,5.0,31.0
651,11693300.0,JC270665,05/20/2019 04:00:00 AM,071XX S GREEN ST,910,MOTOR VEHICLE THEFT,AUTOMOBILE,STREET,False,False,...,-87.645607,"(41.764525607, -87.645606586)",31.0,21559.0,66.0,512.0,32.0,11.0,17.0,215.0


##### Aggregations:

In [24]:
%%time
medium_dd.groupby("Police Districts").agg("count").head(5)

Wall time: 30 s


Unnamed: 0_level_0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,...,Latitude,Longitude,Location,Historical Wards 2003-2015,Zip Codes,Community Areas,Census Tracts,Wards,Boundaries - ZIP Codes,Police Beats
Police Districts,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1.0,197490,197490,197490,197490,197490,197490,197490,197369,197490,197490,...,197490,197490,197490,197489,197490,197489,197453,197489,197489,197490
2.0,119541,119540,119541,119541,119541,119541,119541,119393,119541,119541,...,119541,119541,119541,119541,119541,119541,119541,119541,119541,119541
3.0,41,41,41,41,41,41,41,41,41,41,...,41,41,41,41,41,0,41,2,41,41
4.0,1453,1453,1453,1453,1453,1453,1453,1453,1453,1453,...,1453,1453,1453,1453,1453,0,505,0,6,1453
5.0,304851,304851,304851,304851,304851,304851,304851,304326,304851,304851,...,304851,304851,304851,304851,304851,304851,304851,304851,304851,304851


In [25]:
%reset -f

#### 3.2. Big DataFrame:

In [26]:
from os import path
import dask.dataframe as dd

big_file = path.expanduser("./data/raw/Vehicles.csv")

##### Read data:

In [27]:
%%time
big_dd = dd.read_csv(big_file,
                     assume_missing=True, # 
                     dtype={'rok_produkcji': 'object'}, # set due to dask's dtype inference failing
                     low_memory=False, # columns have mixed types and Dask gives us warning,
                     )

Wall time: 166 ms


##### Show last 2 rows:

In [28]:
%%time
big_dd.tail(2)

Wall time: 1.96 s


Unnamed: 0.1,Unnamed: 0,pojazd_id,marka,kategoria,typ,model,wariant,wersja,rodzaj,podrodzaj,...,siedziba_wlasciciela_woj,siedziba_wlasciciela_pow,siedziba_wlasciciela_gmina,data_pierwszej_rej_w_kraju,createtimestamp,modifytimestamp,siedziba_wlasciciela_woj_teryt,akt_miejsce_rej_wojew_teryt,emisja_co2,emisja_co2_pal_alternatywne1
149139,6753092.0,80800110000000.0,CITROEN,,,XANTIA,,,SAMOCHÓD OSOBOWY,KOMBI,...,MAZOWIECKIE,LEGIONOWSKI,JABŁONNA,1999-07-28,2019-03-15,2019-03-15,14.0,14.0,,
149140,6753093.0,32820190000000.0,TOYOTA,13.0,,YARIS,,,SAMOCHÓD OSOBOWY,WIELOZADANIOWY,...,MAZOWIECKIE,WARSZAWSKI ZACHODNI,STARE BABICE,2013-10-29,2019-03-15,2019-03-15,14.0,14.0,104.0,


##### Number of rows, columns:

In [29]:
%%time
print(f"shape: ({len(big_dd)}, {len(big_dd.columns)})")

shape: (13063687, 72)
Wall time: 1min 45s


##### Basic statistics:

In [30]:
%%time
big_dd.describe().compute()

ValueError: No non-trivial arrays found

At the time doing the analysis it seems that dask have a bug in the method we used,<br>
So we need to wait for it to be fixed (https://github.com/dask/dask/issues/2792).

##### Conditinal statements:

In [31]:
%%time
big_dd.loc[((big_dd["marka"] == "TOYOTA") | (big_dd["marka"] == "FORD")) &  (big_dd["emisja_co2"] < 100)].head(2)

Wall time: 2.46 s


Unnamed: 0.1,Unnamed: 0,pojazd_id,marka,kategoria,typ,model,wariant,wersja,rodzaj,podrodzaj,...,siedziba_wlasciciela_woj,siedziba_wlasciciela_pow,siedziba_wlasciciela_gmina,data_pierwszej_rej_w_kraju,createtimestamp,modifytimestamp,siedziba_wlasciciela_woj_teryt,akt_miejsce_rej_wojew_teryt,emisja_co2,emisja_co2_pal_alternatywne1
868,868.0,51955060000000.0,TOYOTA,13.0,XW3(A),PRIUS,ZVW30(H),ZVW30L-AHXEBW(2C),SAMOCHÓD OSOBOWY,HATCHBACK,...,ŁÓDZKIE,ŁÓDŹ,ŁÓDŹ,2012-01-13,2019-03-15,2019-03-15,10.0,10.0,89.0,
1620,1620.0,4125589000000000.0,TOYOTA,13.0,XP13M(A),YARIS,KSP13(MH),KSP130L-CHMRKW(3L),SAMOCHÓD OSOBOWY,WIELOZADANIOWY,...,,,,2018-05-29,2019-03-15,2019-03-15,,10.0,99.0,


##### Aggregations:

In [32]:
%%time
big_dd.groupby("rodzaj").agg("count").head(5)

Wall time: 2min 1s


Unnamed: 0_level_0,Unnamed: 0,pojazd_id,marka,kategoria,typ,model,wariant,wersja,podrodzaj,przeznaczenie,...,siedziba_wlasciciela_woj,siedziba_wlasciciela_pow,siedziba_wlasciciela_gmina,data_pierwszej_rej_w_kraju,createtimestamp,modifytimestamp,siedziba_wlasciciela_woj_teryt,akt_miejsce_rej_wojew_teryt,emisja_co2,emisja_co2_pal_alternatywne1
rodzaj,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
AUTOBUS,52082,52082,51424,15830,29143,50963,29158,29158,52027,49195,...,38182,38499,38345,52014,52082,52082,38182,52082,1586,0
AUTOBUSY,3,3,3,0,0,3,0,0,3,0,...,3,3,3,1,3,3,3,3,0,0
BRAK DANYCH,3,3,3,1,0,1,0,0,3,3,...,3,3,3,3,3,3,3,3,0,0
CIAGN. ROLNICZY.,1,1,1,0,0,1,0,0,1,0,...,1,1,1,1,1,1,1,1,0,0
CIAGN. SAMOCHOD.,2,2,2,0,0,2,0,0,2,0,...,2,2,2,2,2,2,2,2,0,0


In [1]:
%reset -f

# 4. Spark

For the last excercies we will execute the same operations with Spark -> However it will not be on a cluster.

In [2]:
from os import path
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import monotonically_increasing_id

In [3]:
medium_file = path.expanduser("./data/raw/Crimes.csv")

#### 4.1. Initialize Spark Session

In [4]:
%%time
session = SparkSession.builder.appName("traffic").master("local[*]").getOrCreate()
dataFrameReader = session.read

Wall time: 4.32 s


#### 4.2. Medium DataFrame

##### Read data:

In [5]:
%%time
medium_df = dataFrameReader.option("header","true").option("inferSchema",value=True).csv(medium_file)

Wall time: 12.5 s


##### Show first 2 rows:

Spark doesn't have a tail function so to achieve that we will have to make a trick.<br>
Let's start from just checking .head() of the dataframe.

In [6]:
%%time
medium_df.head(2)

Wall time: 464 ms


[Row(ID=11694147, Case Number='JC271632', Date='05/20/2019 11:59:00 PM', Block='078XX S LAFLIN ST', IUCR='0486', Primary Type='BATTERY', Description='DOMESTIC BATTERY SIMPLE', Location Description='APARTMENT', Arrest=False, Domestic=True, Beat=612, District=6, Ward=17, Community Area=71, FBI Code='08B', X Coordinate=1167474, Y Coordinate=1852808, Year=2019, Updated On='05/27/2019 04:12:56 PM', Latitude=41.751652164, Longitude=-87.661857961, Location='(41.751652164, -87.661857961)', Historical Wards 2003-2015=17, Zip Codes=21554, Community Areas=70, Census Tracts=557, Wards=31, Boundaries - ZIP Codes=59, Police Districts=20, Police Beats=230),
 Row(ID=11694159, Case Number='JC271636', Date='05/20/2019 11:55:00 PM', Block='067XX S EBERHART AVE', IUCR='143A', Primary Type='WEAPONS VIOLATION', Description='UNLAWFUL POSS OF HANDGUN', Location Description='RESIDENCE', Arrest=False, Domestic=False, Beat=321, District=3, Ward=20, Community Area=42, FBI Code='15', X Coordinate=1180739, Y Coordi

##### Show last 2 rows:

To show the last rows of the dataframe we need to sort it.<br>
However we don't have a nice incremental index column so we need to create it before sorting.

In [7]:
%%time
medium_df = medium_df.withColumn("index", monotonically_increasing_id())
print(medium_df.sort(medium_df['index'].desc()).head(10))

[Row(ID=1310313, Case Number='G000070', Date='01/01/2001 12:00:00 AM', Block='109XX S LONGWOOD DR', IUCR='1310', Primary Type='CRIMINAL DAMAGE', Description='TO PROPERTY', Location Description='APARTMENT', Arrest=False, Domestic=False, Beat=2212, District=22, Ward=None, Community Area=None, FBI Code='14', X Coordinate=1165016, Y Coordinate=1832136, Year=2001, Updated On='08/17/2015 03:03:40 PM', Latitude=41.694977232, Longitude=-87.671447845, Location='(41.694977232, -87.671447845)', Historical Wards 2003-2015=33, Zip Codes=22212, Community Areas=74, Census Tracts=380, Wards=42, Boundaries - ZIP Codes=13, Police Districts=9, Police Beats=257, index=111669401745), Row(ID=1392468, Case Number='G106773', Date='01/01/2001 12:00:00 AM', Block='023XX W HURON ST', IUCR='1210', Primary Type='DECEPTIVE PRACTICE', Description='THEFT OF LABOR/SERVICES', Location Description='RESIDENCE', Arrest=False, Domestic=False, Beat=1313, District=12, Ward=None, Community Area=None, FBI Code='11', X Coordina

We can see that this trick is not giving us the same values as tail function in pandas or dask.<br>
However we are not going to investigate it deeper because it's not the intention of this excercise.

##### Number of rows, columns:

In [8]:
%%time
medium_df.count(), len(medium_df.columns)

Wall time: 2.99 s


(6877122, 31)

##### Basic statistics:

In spark we can print data in vertical orientation that will improve readability commparing to default printing in spark.<br>
This functionality is implemented in newest version of Spark.

In [9]:
%%time
medium_df.describe().show(truncate=False, vertical=True)

-RECORD 0---------------------------------------------------
 summary                    | count                         
 ID                         | 6877122                       
 Case Number                | 6877118                       
 Date                       | 6877122                       
 Block                      | 6877122                       
 IUCR                       | 6877122                       
 Primary Type               | 6877122                       
 Description                | 6877122                       
 Location Description       | 6872099                       
 Beat                       | 6877122                       
 District                   | 6877075                       
 Ward                       | 6262298                       
 Community Area             | 6263626                       
 FBI Code                   | 6877122                       
 X Coordinate               | 6811932                       
 Y Coordinate           

##### Conditinal statements:

In [10]:
%%time
medium_df.filter(medium_df["Community Areas"] > 50).filter(medium_df["Boundaries - ZIP Codes"] < 60).filter(medium_df["Description"] == "AUTOMOBILE").show(2, truncate=False, vertical=True)

-RECORD 0---------------------------------------------------
 ID                         | 11694283                      
 Case Number                | JC271784                      
 Date                       | 05/20/2019 06:00:00 PM        
 Block                      | 009XX W DIVERSEY PKWY         
 IUCR                       | 0910                          
 Primary Type               | MOTOR VEHICLE THEFT           
 Description                | AUTOMOBILE                    
 Location Description       | RESIDENCE                     
 Arrest                     | false                         
 Domestic                   | false                         
 Beat                       | 1935                          
 District                   | 19                            
 Ward                       | 43                            
 Community Area             | 7                             
 FBI Code                   | 07                            
 X Coordinate           

##### Aggregations:

In [11]:
%%time
medium_df.groupby("Police Districts").agg(F.count("Police Districts")).show(5)

+----------------+-----------------------+
|Police Districts|count(Police Districts)|
+----------------+-----------------------+
|              12|                 220911|
|              22|                 272220|
|            null|                      0|
|               1|                 197490|
|              13|                 458507|
+----------------+-----------------------+
only showing top 5 rows

Wall time: 9.33 s


We need to clear memory space from all the variables, then import requred libraries and create variables once more.

In [12]:
%reset -f

#### 2.2. Big DataFrame:

In the largest dataset we will have to turn __low_memory__ to __False__ to proceed with the operation (DtypeWarning associated with column types would arrise).

In [1]:
from os import path
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import monotonically_increasing_id

big_file = path.expanduser("./data/raw/Vehicles.csv")

In [2]:
%%time
session = SparkSession.builder.appName("traffic").master("local[*]").getOrCreate()
dataFrameReader = session.read

Wall time: 4.11 s


##### Read data:

In [3]:
%%time
big_df = dataFrameReader.option("header","true").option("inferSchema",value=True).csv(big_file)

Wall time: 1min 44s


It looks like our local computer can handle the big files > 4GB, but it's quite time consuming taks comparing to processing<br>
and it looks like it is taking the most of time in our comparison.

##### Show last 2 rows:

In [4]:
%%time
big_df.head(2)

Wall time: 313 ms


[Row(_c0=0, pojazd_id=6649493940242937, marka='MERCEDES-BENZ', kategoria=13.0, typ=None, model='C 220 D 4MATIC', wariant=None, wersja=None, rodzaj='SAMOCHÓD OSOBOWY', podrodzaj='KARETA (SEDAN)', przeznaczenie='---', pochodzenie='NOWY ZAKUPIONY W KRAJU', rodzaj_tab_znamionowej='ORYGINALNA', rok_produkcji='2018', sposob_produkcji='FABRYCZNY', data_pierwszej_rej='2018-11-27', data_rejestracji_ost=datetime.datetime(2018, 11, 27, 0, 0), data_pierwszej_rej_za_granica=None, pojemnosc_silnika=1950.0, moc_do_masy=None, moc_silnika=143.0, moc_silnika_hybrydowego=None, masa_wlasna=1685.0, masa_pgj=None, dopuszczalna_masa_calkowita=2245.0, maksymalna_masa_calkowita=2245.0, dopuszczalna_ladownosc_calk=None, maksymalna_ladownosc_calk=None, dopuszczalna_masa_ciag_zesp=4115.0, liczba_osi=2.0, naj_dopuszczalny_nacisk_osi=11.0, naj_maksymalny_nacisk_osi=11.0, max_masa_przyczepy_z_hamulcem=1800.0, max_masa_przyczepy_bez_ham=750.0, liczba_miejsc_ogolem=5.0, liczba_miejsc_siedzacych=5.0, liczba_miejsc_stoj

In [5]:
%%time
big_df = big_df.withColumn("index", monotonically_increasing_id())
big_df.sort(big_df['index'].desc()).head(2)

Wall time: 1min 42s


##### Number of rows, columns:

In [6]:
%%time
big_df.count(), len(big_df.columns)

Wall time: 7.1 s


(13063687, 73)

##### Basic statistics:

In [7]:
%%time
big_df.describe().show(truncate=False, vertical=True)

-RECORD 0-----------------------------------------------------------
 summary                        | count                             
 _c0                            | 13063687                          
 pojazd_id                      | 13063687                          
 marka                          | 12027721                          
 kategoria                      | 4276366                           
 typ                            | 7946194                           
 model                          | 12711395                          
 wariant                        | 7951045                           
 wersja                         | 7951036                           
 rodzaj                         | 13009184                          
 podrodzaj                      | 12918427                          
 przeznaczenie                  | 12524454                          
 pochodzenie                    | 12510782                          
 rodzaj_tab_znamionowej         | 

##### Conditinal statements:

In [8]:
%%time
big_df.filter((big_df["marka"] == "TOYOTA") | (big_df["marka"] == "FORD")).filter(big_df["emisja_co2"] < 100).show(2, truncate=False, vertical=True)

-RECORD 0---------------------------------------------------
 _c0                            | 868                       
 pojazd_id                      | 51955064994346            
 marka                          | TOYOTA                    
 kategoria                      | 13.0                      
 typ                            | XW3(A)                    
 model                          | PRIUS                     
 wariant                        | ZVW30(H)                  
 wersja                         | ZVW30L-AHXEBW(2C)         
 rodzaj                         | SAMOCHÓD OSOBOWY          
 podrodzaj                      | HATCHBACK                 
 przeznaczenie                  | ---                       
 pochodzenie                    | NOWY ZAKUPIONY W KRAJU    
 rodzaj_tab_znamionowej         | ORYGINALNA                
 rok_produkcji                  | 2011                      
 sposob_produkcji               | FABRYCZNY                 
 data_pierwszej_rej     

##### Aggregations:

In [9]:
%%time
big_df.groupby("rodzaj").agg(F.count("rodzaj")).show(5)

+--------------------+-------------+
|              rodzaj|count(rodzaj)|
+--------------------+-------------+
|SAMOCHÓD CIĘŻAROW...|         5575|
|SAMOCHODY CIĘŻARO...|            1|
|PRZYCZ.SPECJALIZO...|        10496|
|PRZYCZEPA AUTOBUSOWA|           66|
|SAM.CIĘŻ.UNIWERSALNY|           10|
+--------------------+-------------+
only showing top 5 rows

Wall time: 24.1 s


In [10]:
%reset -f

# 5. Summary

Let's start from creating a summary table to see the times for each process:

In [14]:
import pandas as pd

times = {
    'pandas':["43.4s", "0ns", "0ns", "8.47s", "692ms", "5.27s", "9min18s", "8.97ms", "0ns", "31.7s", "2.24s", "44.8s"],
    'dask':["67.8ms", "527ms", "25.8s", "1min 36s", "1.6s", "30s", "166ms" ,"1.96s" ,"1min45s" ,"error" , "2.46s", "2min 1s"],
    'spark':["4.23s* + 12.5s", "27.2s", "2.99s", "1min 28s", "298ms", "9.33s", "4.11s* + 1min 44s", "1min 42s", "7.1s", "6min 38s", "376ms", "24.1s"]
}

index = ['medium_read', 'medium_tail', 'medium_shape', 'medium_stats', 'medium_filter', 'medium_aggregation', 'big_read', 'big_tail', 'big_shape', 'big_stats', 'big_filter', 'big_aggregation']
time_df = pd.DataFrame.from_dict(times)
time_df.index = index
time_df

Unnamed: 0,pandas,dask,spark
medium_read,43.4s,67.8ms,4.23s* + 12.5s
medium_tail,0ns,527ms,27.2s
medium_shape,0ns,25.8s,2.99s
medium_stats,8.47s,1min 36s,1min 28s
medium_filter,692ms,1.6s,298ms
medium_aggregation,5.27s,30s,9.33s
big_read,9min18s,166ms,4.11s* + 1min 44s
big_tail,8.97ms,1.96s,1min 42s
big_shape,0ns,1min45s,7.1s
big_stats,31.7s,error,6min 38s


In [25]:
times = {
    "pandas":["57.83s", "636,75s"],
    "dask":["153,99s", "230,59s**"],
    "spark":["144,55s", "639,69s"]
}

index = ['medium_total', 'big_total']
time_df = pd.DataFrame.from_dict(times)
time_df.index = index
time_df

Unnamed: 0,pandas,dask,spark
medium_total,57.83s,"153,99s","144,55s"
big_total,"636,75s","230,59s**","639,69s"


** dask had an error in basic statistics process.

So we can see that __Pandas__ works by reading the whole dataset to the memory and then it's able to do any analitics you need in a very short time<br>(general statistics take the most time to process).

__Dask__ and __Spark__ use __lazy evaluation__, which means that the execution will not start until an action is triggered.<br>It is a better solution if you know exactly what you need and what specific queries you want to use like shape of the dataset and data filtering.

If you had to know the statistic for the datasets you would have to choose between __Pandas__ and __Spark__, as __Dask__ threw an error on the big dataset.<br>Also you have to remember that in some cases you will not be able to import > 4GB data into __Pandas__ (it occured on our machine few times as well!),<br>it depends on memory usage (eventually you can read the files in batches and process data on the fly).

To sum up, if you’re working with small datasets or exploring a asample of a larged dataset by doing some computations then __Pandas__ is the right choice,<br>but when the data you're working with grows a bit more, then you should think about parallelism. 

__Dask__ will be the preferable choice when you want to stick to __Pandas__ environment and commands. 

When you want to use advance SQL queries, we strongly recommend you to integrate your data analitics pipeline with the cloud computing<br>analitics platform like __Databricks__ and really boost your computing speed by creating clusters of machines to execute your processes.<br>
Then use __Spark__ as it is an all-in-one solution and ["achieves high performance for both batch and streaming data, using a state-of-the-art DAG scheduler, a query optimizer, and a physical execution engine"](https://spark.apache.org/).<br>You will of course need to get use to a different data visualization (as seen in our example) but it's not that difficult :)