# Dask

Dask ist mein persönlicher Favorit zur Arbeit mit Datenmengen größer Arbeitsspeicher. 
Es gibt in Dask mehrere Sub-Packages zur Arbeit mit verschiedenen Datensets. Dataframes, Arrays und JSON Dokumente können mit Dask prozessiert werden<br>
<a href='https://dask.pydata.org/en/latest/'> Dask API </a>

Woher bekomme ich Dask? 
http://dask.pydata.org/en/latest/install.html
oder pip install dask/ condas install dask

Dask bietet DataFrames im bekannten Pandas Format. Die Abbildung zeigt wie Dask arbeitet. Es zerlegt die eingelesene Datei mehrere Subsampels/DataFrames und bearbeitet jedes mal ein Dataframe im Arbeitsspeicher.<br>
<img src='dask-dataframe.png'>

Dask Array dem Numpy Array nachempfunden. Es funktioniert in mehrere Dimensionen und nicht nur in die Länge wie das DataFrame.<br>
<img src='dask-array-black-text.png'>

Für JSON und unstrukturierte Daten kann man Dask Bag verwenden.

Wie funktioniert Dask generell?<br>
Dask zieht erstmal nur einen kleinen Ausschnitt der Daten um einen Überblick für den User zu erstellen.
Man kann sich immer einen Ausschnitt aus den oberen Spalten DataFram.head() geben lassen, wird aber nie das gesamte DataFrame sehen.
Im Beispiel werden wir das Taxiset mit 100.000 Reihen von vorher nutzen.
Kann aber mit wesentlich größeren Daten Arbeiten.<br>
Wer es mal testen will <a href='https://data.cityofnewyork.us/Transportation/2015-Yellow-Taxi-Trip-Data/ba8s-jw6u'> NYC Open Data Taxi</a>
160 Millionen Zeilen, 10 GB waren ohne Probleme zu bearbeiten


In [1]:
import dask.dataframe as dd
df = dd.read_csv(r"taxi_hunderttausend.csv",error_bad_lines=False)
df

Unnamed: 0_level_0,No.,vendorid,pickup_datetime,dropoff_datetime,Store_and_fwd_flag,rate_code,Pickup_longitude,Pickup_latitude,Dropoff_longitude,Dropoff_latitude,Passenger_count,Trip_distance,Fare_amount,Extra,MTA_tax,Tip_amount,Tolls_amount,Ehail_fee,Improvement_surcharge,Total_amount,Payment_type,Trip_type
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1
,int64,int64,object,object,object,int64,float64,float64,float64,float64,int64,float64,float64,float64,float64,float64,float64,float64,float64,float64,int64,int64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


Die Datei könnte ja zu groß seien um Dataframes zu visualisieren. Deshalb immer der Befehl head() um die oberen Spalten anzuzeigen

In [2]:
col = df.columns
col

Index(['No.', 'vendorid', 'pickup_datetime', 'dropoff_datetime',
       'Store_and_fwd_flag', 'rate_code', 'Pickup_longitude',
       'Pickup_latitude', 'Dropoff_longitude', 'Dropoff_latitude',
       'Passenger_count', 'Trip_distance', 'Fare_amount', 'Extra', 'MTA_tax',
       'Tip_amount', 'Tolls_amount', 'Ehail_fee', 'Improvement_surcharge',
       'Total_amount', 'Payment_type', 'Trip_type'],
      dtype='object')

In [3]:
df.head(10)

Unnamed: 0,No.,vendorid,pickup_datetime,dropoff_datetime,Store_and_fwd_flag,rate_code,Pickup_longitude,Pickup_latitude,Dropoff_longitude,Dropoff_latitude,...,Fare_amount,Extra,MTA_tax,Tip_amount,Tolls_amount,Ehail_fee,Improvement_surcharge,Total_amount,Payment_type,Trip_type
0,1,1,03/12/2015 09:26:05 AM,03/12/2015 09:33:58 AM,N,1,-73.950478,40.826733,-73.971558,40.795403,...,10.0,0.0,0.5,1.0,0.0,,0.3,11.8,1,1
1,2,1,05/29/2015 03:27:07 AM,05/29/2015 03:45:28 AM,N,1,-73.914124,40.823772,-73.975517,40.751713,...,23.0,0.5,0.5,0.0,0.0,,0.3,24.3,2,1
2,3,2,11/24/2015 08:07:46 PM,11/24/2015 08:31:35 PM,N,1,-73.952751,40.72718,-73.971313,40.69326,...,19.5,0.5,0.5,0.0,0.0,,0.3,20.8,2,1
3,4,2,12/18/2015 06:46:25 PM,12/18/2015 06:53:06 PM,N,1,-73.960274,40.720379,-73.956093,40.73299,...,6.5,1.0,0.5,2.0,0.0,,0.3,10.3,1,1
4,5,2,09/22/2015 06:32:36 PM,09/22/2015 06:34:44 PM,N,1,-73.829956,40.713699,-73.823364,40.713306,...,4.0,1.0,0.5,0.0,0.0,,0.3,5.8,2,1
5,6,2,05/15/2015 08:42:44 PM,05/15/2015 08:56:03 PM,N,1,-73.964096,40.710148,-73.994087,40.695229,...,13.5,0.5,0.5,0.0,0.0,,0.3,14.8,2,1
6,7,2,09/02/2015 02:38:54 PM,09/02/2015 02:45:25 PM,N,1,-73.83049,40.759331,-73.843788,40.771461,...,7.0,0.0,0.5,1.56,0.0,,0.3,9.36,1,1
7,8,2,03/29/2015 04:56:26 AM,03/29/2015 05:36:26 AM,N,1,-73.955406,40.721008,-74.005081,40.739941,...,29.0,0.5,0.5,6.06,0.0,,0.3,36.36,1,1
8,9,2,05/30/2015 04:22:47 AM,05/30/2015 04:42:57 AM,N,1,-73.956276,40.72171,-73.906715,40.694427,...,16.0,0.5,0.5,3.46,0.0,,0.3,20.76,1,1
9,10,2,03/29/2015 01:46:20 PM,03/29/2015 01:56:56 PM,N,1,-73.954079,40.811382,-73.945732,40.833076,...,9.0,0.0,0.5,0.0,0.0,,0.3,9.8,2,1


In [4]:
import sys
objectsize = sys.getsizeof(df)
objectsize

56

Das Dask.DataFrame ist nur 56-Bit groß. Es enthält keine Daten wie der Befehl df oben zeigt. Es ist lediglich ein Verweis über Struktur und Ort der Datei

In [5]:
df = df[df['Total_amount']>15]

In [6]:
df.head(10)

Unnamed: 0,No.,vendorid,pickup_datetime,dropoff_datetime,Store_and_fwd_flag,rate_code,Pickup_longitude,Pickup_latitude,Dropoff_longitude,Dropoff_latitude,...,Fare_amount,Extra,MTA_tax,Tip_amount,Tolls_amount,Ehail_fee,Improvement_surcharge,Total_amount,Payment_type,Trip_type
1,2,1,05/29/2015 03:27:07 AM,05/29/2015 03:45:28 AM,N,1,-73.914124,40.823772,-73.975517,40.751713,...,23.0,0.5,0.5,0.0,0.0,,0.3,24.3,2,1
2,3,2,11/24/2015 08:07:46 PM,11/24/2015 08:31:35 PM,N,1,-73.952751,40.72718,-73.971313,40.69326,...,19.5,0.5,0.5,0.0,0.0,,0.3,20.8,2,1
7,8,2,03/29/2015 04:56:26 AM,03/29/2015 05:36:26 AM,N,1,-73.955406,40.721008,-74.005081,40.739941,...,29.0,0.5,0.5,6.06,0.0,,0.3,36.36,1,1
8,9,2,05/30/2015 04:22:47 AM,05/30/2015 04:42:57 AM,N,1,-73.956276,40.72171,-73.906715,40.694427,...,16.0,0.5,0.5,3.46,0.0,,0.3,20.76,1,1
10,11,2,08/30/2015 09:31:46 PM,08/30/2015 10:04:30 PM,N,1,-73.844215,40.72142,-73.858116,40.723831,...,24.5,0.5,0.5,0.0,0.0,,0.3,25.8,2,1
15,16,2,06/02/2015 11:50:25 AM,06/02/2015 12:15:20 PM,N,1,-73.956245,40.747505,-73.969193,40.750237,...,18.0,0.0,0.5,0.0,0.0,,0.3,18.8,2,1
19,20,2,03/27/2015 12:57:02 AM,03/27/2015 01:12:57 AM,N,1,-73.955009,40.72237,-73.968163,40.684406,...,14.0,0.5,0.5,3.06,0.0,,0.3,18.36,1,1
23,24,2,07/27/2015 02:21:58 PM,07/27/2015 02:33:22 PM,N,1,-73.950211,40.827045,-73.979424,40.789413,...,12.0,0.0,0.5,2.56,0.0,,0.3,15.36,1,1
24,25,2,03/11/2015 11:48:20 AM,03/11/2015 12:02:10 PM,N,1,-73.96698,40.70583,-73.978256,40.686993,...,12.0,0.0,0.5,2.56,0.0,,0.3,15.36,1,1
29,30,2,06/12/2015 06:17:40 PM,06/12/2015 06:32:03 PM,N,1,-73.860786,40.833641,-73.928329,40.808998,...,16.5,1.0,0.5,0.0,0.0,,0.3,18.3,2,1


berechnen wir mal den Durchschnittlichen Fahrpreis

In [8]:
x = df.Total_amount.mean()
x

dd.Scalar<series-..., dtype=float64>

Warum ist mean() nur ein Skalar?
df ist immernoch lediglich ein Verweis und mean() ist genauso nur ein Befehl/Verweis. Um aus den Befehlen und Strukturen jetzt einen realen Wert zu berechnen fehlt, eine Aufforderung zum ausführen.<br>
Dieser Befehl lautet compute()


In [9]:
x.compute()

26.10853930217154

Aus den Verweisen wurde ein realer Wert

# Wie wirkt sich compute() auf andere Operationen aus?

Beispiel wir erzeugen df>15 schon direkt als Dataframe beim erzeugen

In [10]:
df = dd.read_csv(r"taxi_hunderttausend.csv",error_bad_lines=False)
df = df[df['Total_amount']>15].compute()
df.head(10)

Unnamed: 0,No.,vendorid,pickup_datetime,dropoff_datetime,Store_and_fwd_flag,rate_code,Pickup_longitude,Pickup_latitude,Dropoff_longitude,Dropoff_latitude,...,Fare_amount,Extra,MTA_tax,Tip_amount,Tolls_amount,Ehail_fee,Improvement_surcharge,Total_amount,Payment_type,Trip_type
1,2,1,05/29/2015 03:27:07 AM,05/29/2015 03:45:28 AM,N,1,-73.914124,40.823772,-73.975517,40.751713,...,23.0,0.5,0.5,0.0,0.0,,0.3,24.3,2,1
2,3,2,11/24/2015 08:07:46 PM,11/24/2015 08:31:35 PM,N,1,-73.952751,40.72718,-73.971313,40.69326,...,19.5,0.5,0.5,0.0,0.0,,0.3,20.8,2,1
7,8,2,03/29/2015 04:56:26 AM,03/29/2015 05:36:26 AM,N,1,-73.955406,40.721008,-74.005081,40.739941,...,29.0,0.5,0.5,6.06,0.0,,0.3,36.36,1,1
8,9,2,05/30/2015 04:22:47 AM,05/30/2015 04:42:57 AM,N,1,-73.956276,40.72171,-73.906715,40.694427,...,16.0,0.5,0.5,3.46,0.0,,0.3,20.76,1,1
10,11,2,08/30/2015 09:31:46 PM,08/30/2015 10:04:30 PM,N,1,-73.844215,40.72142,-73.858116,40.723831,...,24.5,0.5,0.5,0.0,0.0,,0.3,25.8,2,1
15,16,2,06/02/2015 11:50:25 AM,06/02/2015 12:15:20 PM,N,1,-73.956245,40.747505,-73.969193,40.750237,...,18.0,0.0,0.5,0.0,0.0,,0.3,18.8,2,1
19,20,2,03/27/2015 12:57:02 AM,03/27/2015 01:12:57 AM,N,1,-73.955009,40.72237,-73.968163,40.684406,...,14.0,0.5,0.5,3.06,0.0,,0.3,18.36,1,1
23,24,2,07/27/2015 02:21:58 PM,07/27/2015 02:33:22 PM,N,1,-73.950211,40.827045,-73.979424,40.789413,...,12.0,0.0,0.5,2.56,0.0,,0.3,15.36,1,1
24,25,2,03/11/2015 11:48:20 AM,03/11/2015 12:02:10 PM,N,1,-73.96698,40.70583,-73.978256,40.686993,...,12.0,0.0,0.5,2.56,0.0,,0.3,15.36,1,1
29,30,2,06/12/2015 06:17:40 PM,06/12/2015 06:32:03 PM,N,1,-73.860786,40.833641,-73.928329,40.808998,...,16.5,1.0,0.5,0.0,0.0,,0.3,18.3,2,1


In [11]:
objectsize = sys.getsizeof(df)
objectsize

12622484

In [12]:
x = df.Total_amount.mean()
x

26.10853930217154

# Wieso existiert compute() überhaupt?

Man kann mehrere Befehle hintereinander schalten ohne die Berechnung die bei 10GB extrem lange dauern kann auszuführen.
Wenn man statt mean() , maen() ausversehen schreibt und erst df>15 ausgeführt wird gibt es einen Error nachdem er das Dataframe erzeugt hat. 

In [13]:
df = dd.read_csv(r"taxi_hunderttausend.csv",error_bad_lines=False)
df = df[df['Total_amount']>15]
y = df.maen()
y.compute()

AttributeError: 'DataFrame' object has no attribute 'maen'

df ist immernoch ein Verweis, aber er erzeugt einen Fehler ohne df vorher berechnet zu haben.

Um das DataFrame in eine CSV Datei zu verwandeln benötigen wir den Befehl df.to_csv. Der Dateiname benötigt noch ein .*. um die verschiedenen Partitionen abzuspeichern

In [14]:
df = dd.read_csv(r"taxi_hunderttausend.csv",error_bad_lines=False)

# Speichern eines Dask Dataframes

In [15]:
df.to_csv(r"taxipartition.*.csv",compression = 'xz',mode = 'a')

man kann die Datei auch komprimieren'compression = 'compression der Wahl'' und mode bestimmt den Umgang wenn eine Datei mit selben Namen bereits vorhanden ist. 
    r for reading,
    w for writing,
    r+ opens for reading and writing (cannot truncate a file),
    w+ for writing and reading (can truncate a file),
    rb+ reading or writing a binary file,
    wb+ writing a binary file,
    a opens for appending,
    x open for exclusive creation, failing if the file already exists (Python 3)
<img src='compression.png'>
und so sieht die Datei in Excel nach der compression aus
<img src='compression2.png'>

So sieht der Ordner aus wenn man größere Dateien verwendet und speichert.<br>
Die Datei ist jetzt in Partitionen zerlegt und als ganze Datei vorhanden. Nicht vergessen, kostet eine Menge an Speicher<br>
Dask hat kein Problem damit die einzelnen Partitionen als ganzes zu laden

<img src='partition.png'>

Wie kann ich jetzt mit den einzelnen Partitionen weiterarbeiten?
Einfach den Zielordner angeben und .001,.002 ... durch .*. ersetzen

In [16]:
df2 = dd.read_csv(r"taxipartition.*.csv",compression = 'xz')
df2.head(10)

Please ensure that each individual file can fit in memory and
use the keyword ``blocksize=None to remove this message``
Setting ``blocksize=None``
  "Setting ``blocksize=None``" % compression)


Unnamed: 0.1,Unnamed: 0,No.,vendorid,pickup_datetime,dropoff_datetime,Store_and_fwd_flag,rate_code,Pickup_longitude,Pickup_latitude,Dropoff_longitude,...,Fare_amount,Extra,MTA_tax,Tip_amount,Tolls_amount,Ehail_fee,Improvement_surcharge,Total_amount,Payment_type,Trip_type
0,0,1,1,03/12/2015 09:26:05 AM,03/12/2015 09:33:58 AM,N,1,-73.950478,40.826733,-73.971558,...,10.0,0.0,0.5,1.0,0.0,,0.3,11.8,1,1
1,1,2,1,05/29/2015 03:27:07 AM,05/29/2015 03:45:28 AM,N,1,-73.914124,40.823772,-73.975517,...,23.0,0.5,0.5,0.0,0.0,,0.3,24.3,2,1
2,2,3,2,11/24/2015 08:07:46 PM,11/24/2015 08:31:35 PM,N,1,-73.952751,40.72718,-73.971313,...,19.5,0.5,0.5,0.0,0.0,,0.3,20.8,2,1
3,3,4,2,12/18/2015 06:46:25 PM,12/18/2015 06:53:06 PM,N,1,-73.960274,40.720379,-73.956093,...,6.5,1.0,0.5,2.0,0.0,,0.3,10.3,1,1
4,4,5,2,09/22/2015 06:32:36 PM,09/22/2015 06:34:44 PM,N,1,-73.829956,40.713699,-73.823364,...,4.0,1.0,0.5,0.0,0.0,,0.3,5.8,2,1
5,5,6,2,05/15/2015 08:42:44 PM,05/15/2015 08:56:03 PM,N,1,-73.964096,40.710148,-73.994087,...,13.5,0.5,0.5,0.0,0.0,,0.3,14.8,2,1
6,6,7,2,09/02/2015 02:38:54 PM,09/02/2015 02:45:25 PM,N,1,-73.83049,40.759331,-73.843788,...,7.0,0.0,0.5,1.56,0.0,,0.3,9.36,1,1
7,7,8,2,03/29/2015 04:56:26 AM,03/29/2015 05:36:26 AM,N,1,-73.955406,40.721008,-74.005081,...,29.0,0.5,0.5,6.06,0.0,,0.3,36.36,1,1
8,8,9,2,05/30/2015 04:22:47 AM,05/30/2015 04:42:57 AM,N,1,-73.956276,40.72171,-73.906715,...,16.0,0.5,0.5,3.46,0.0,,0.3,20.76,1,1
9,9,10,2,03/29/2015 01:46:20 PM,03/29/2015 01:56:56 PM,N,1,-73.954079,40.811382,-73.945732,...,9.0,0.0,0.5,0.0,0.0,,0.3,9.8,2,1


In [17]:
import sys
objectsize = sys.getsizeof(df2)
objectsize

56

Die größe der Datei ist unverändert, die compression existiert nur auf der Festplatte.<br>
Aber wir wissen, das Problem beim arbeiten mit großen Daten ist die Lesegeschwindigkeit der Festplatte und da helfen kleinere Dateien

<a href:'http://dask.pydata.org/en/latest/dataframe-api.html#store-dataframes'> Dask API read_csv/to_csv</a>

Mit dem folgenden Begriff kann man die Dateien zu einer großen Datei vereinen zum weitersenden etc.

In [18]:
from glob import glob
filenames = glob(r"Data\yellow_tripdata_2015-01-06longer10.*.csv")
with open(r"Data\yellow_tripdata_2015-01-06longer10Full.csv", 'w') as out:
    for fn in filenames:
        with open(fn) as f:
            out.write(f.read())

FileNotFoundError: [Errno 2] No such file or directory: 'Data\\yellow_tripdata_2015-01-06longer10Full.csv'

# Weitere Funktionen in Dask

group by Funktionen

In [25]:
df3 = df2.groupby('Payment_type').Trip_distance.mean().compute()
df3.head()

AttributeError: 'Series' object has no attribute 'compute'

In [20]:
trip_length = df2.groupby(df2['Passenger_count']).Trip_distance.max().compute()
trip_length

Passenger_count
0      4.40
1    137.30
2     47.95
3     33.63
4     42.10
5     32.30
6     21.25
7      0.00
8      2.75
9     11.90
Name: Trip_distance, dtype: float64

In [24]:
df3.to_csv(r"df3.csv", mode = 'w+')
import pandas as pd
df3 = pd.read_csv(r'df3.csv')
df3.columns  = ['Payment_type','avgtrip_distane']
df3

ValueError: Length mismatch: Expected axis has 3 elements, new values have 2 elements

Merge Funktion <br>
df3 ist ein Pandas Dataframe. Da Dask auf Pandas aufgebaut wurde sind Pandas und Dask in vielen Funktionen kompatibel

In [23]:
df2 = df2.merge(df3, how = 'inner', on='Payment_type').compute()
df2.head(10)

AttributeError: 'DataFrame' object has no attribute 'compute'

Map Funktion

In [30]:
import math
df['Pickup_longitude'] = df['Pickup_longitude'].map(lambda x: math.ceil(x) )
df['Dropoff_longitude'] = df['Dropoff_longitude'].map(lambda x: math.ceil(x) )
df.head(10)

Unnamed: 0,No.,vendorid,pickup_datetime,dropoff_datetime,Store_and_fwd_flag,rate_code,Pickup_longitude,Pickup_latitude,Dropoff_longitude,Dropoff_latitude,...,Fare_amount,Extra,MTA_tax,Tip_amount,Tolls_amount,Ehail_fee,Improvement_surcharge,Total_amount,Payment_type,Trip_type
0,1,1,03/12/2015 09:26:05 AM,03/12/2015 09:33:58 AM,N,1,-73,40.826733,-73,40.795403,...,10.0,0.0,0.5,1.0,0.0,,0.3,11.8,1,1
1,2,1,05/29/2015 03:27:07 AM,05/29/2015 03:45:28 AM,N,1,-73,40.823772,-73,40.751713,...,23.0,0.5,0.5,0.0,0.0,,0.3,24.3,2,1
2,3,2,11/24/2015 08:07:46 PM,11/24/2015 08:31:35 PM,N,1,-73,40.72718,-73,40.69326,...,19.5,0.5,0.5,0.0,0.0,,0.3,20.8,2,1
3,4,2,12/18/2015 06:46:25 PM,12/18/2015 06:53:06 PM,N,1,-73,40.720379,-73,40.73299,...,6.5,1.0,0.5,2.0,0.0,,0.3,10.3,1,1
4,5,2,09/22/2015 06:32:36 PM,09/22/2015 06:34:44 PM,N,1,-73,40.713699,-73,40.713306,...,4.0,1.0,0.5,0.0,0.0,,0.3,5.8,2,1
5,6,2,05/15/2015 08:42:44 PM,05/15/2015 08:56:03 PM,N,1,-73,40.710148,-73,40.695229,...,13.5,0.5,0.5,0.0,0.0,,0.3,14.8,2,1
6,7,2,09/02/2015 02:38:54 PM,09/02/2015 02:45:25 PM,N,1,-73,40.759331,-73,40.771461,...,7.0,0.0,0.5,1.56,0.0,,0.3,9.36,1,1
7,8,2,03/29/2015 04:56:26 AM,03/29/2015 05:36:26 AM,N,1,-73,40.721008,-74,40.739941,...,29.0,0.5,0.5,6.06,0.0,,0.3,36.36,1,1
8,9,2,05/30/2015 04:22:47 AM,05/30/2015 04:42:57 AM,N,1,-73,40.72171,-73,40.694427,...,16.0,0.5,0.5,3.46,0.0,,0.3,20.76,1,1
9,10,2,03/29/2015 01:46:20 PM,03/29/2015 01:56:56 PM,N,1,-73,40.811382,-73,40.833076,...,9.0,0.0,0.5,0.0,0.0,,0.3,9.8,2,1


Auch die where Funktion ist vorhanden.

In [32]:
df5 = df.where(df['Pickup_longitude'] != df['Dropoff_longitude'])
df5.compute()

Unnamed: 0,No.,vendorid,pickup_datetime,dropoff_datetime,Store_and_fwd_flag,rate_code,Pickup_longitude,Pickup_latitude,Dropoff_longitude,Dropoff_latitude,...,Fare_amount,Extra,MTA_tax,Tip_amount,Tolls_amount,Ehail_fee,Improvement_surcharge,Total_amount,Payment_type,Trip_type
0,,,,,,,,,,,...,,,,,,,,,,
1,,,,,,,,,,,...,,,,,,,,,,
2,,,,,,,,,,,...,,,,,,,,,,
3,,,,,,,,,,,...,,,,,,,,,,
4,,,,,,,,,,,...,,,,,,,,,,
5,,,,,,,,,,,...,,,,,,,,,,
6,,,,,,,,,,,...,,,,,,,,,,
7,8.0,2.0,03/29/2015 04:56:26 AM,03/29/2015 05:36:26 AM,N,1.0,-73.0,40.721008,-74.0,40.739941,...,29.0,0.5,0.5,6.06,0.0,,0.3,36.36,1.0,1.0
8,,,,,,,,,,,...,,,,,,,,,,
9,,,,,,,,,,,...,,,,,,,,,,


und auch die dropna Funktion ist vorhanden

In [33]:
df5 = df5.dropna(how = 'all')
df5.compute()

Unnamed: 0,No.,vendorid,pickup_datetime,dropoff_datetime,Store_and_fwd_flag,rate_code,Pickup_longitude,Pickup_latitude,Dropoff_longitude,Dropoff_latitude,...,Fare_amount,Extra,MTA_tax,Tip_amount,Tolls_amount,Ehail_fee,Improvement_surcharge,Total_amount,Payment_type,Trip_type
7,8.0,2.0,03/29/2015 04:56:26 AM,03/29/2015 05:36:26 AM,N,1.0,-73.0,40.721008,-74.0,40.739941,...,29.0,0.5,0.5,6.06,0.00,,0.3,36.36,1.0,1.0
54,55.0,2.0,10/01/2015 02:35:59 PM,10/01/2015 03:05:13 PM,N,1.0,-74.0,40.648205,-73.0,40.698139,...,21.0,0.0,0.5,4.36,0.00,,0.3,26.16,1.0,1.0
74,75.0,2.0,07/16/2015 12:58:11 AM,07/16/2015 01:48:35 AM,N,1.0,-73.0,40.694859,-74.0,40.712341,...,37.5,0.5,0.5,0.00,9.75,,0.3,48.55,2.0,1.0
87,88.0,1.0,05/17/2015 08:17:01 PM,05/17/2015 08:45:12 PM,N,1.0,-73.0,40.725574,-74.0,40.735134,...,21.0,0.5,0.5,4.45,0.00,,0.3,26.75,1.0,1.0
98,99.0,2.0,08/20/2015 11:51:19 PM,08/21/2015 12:02:24 AM,N,1.0,-73.0,40.702984,-74.0,40.628799,...,18.5,0.5,0.5,0.00,0.00,,0.3,19.80,2.0,1.0
102,103.0,2.0,11/26/2015 08:06:53 PM,11/26/2015 08:21:48 PM,N,1.0,-73.0,40.700001,-74.0,40.739738,...,14.5,0.5,0.5,0.00,0.00,,0.3,15.80,1.0,1.0
114,115.0,2.0,09/15/2015 08:34:50 PM,09/15/2015 08:39:04 PM,N,1.0,-74.0,40.688339,-73.0,40.689243,...,5.0,0.5,0.5,0.00,0.00,,0.3,6.30,2.0,1.0
134,135.0,2.0,06/09/2015 02:14:02 PM,06/09/2015 02:54:45 PM,N,1.0,-74.0,40.678707,-73.0,40.750484,...,28.5,0.0,0.5,0.00,5.54,,0.3,34.84,2.0,1.0
149,150.0,2.0,09/06/2015 04:22:03 PM,09/06/2015 04:28:26 PM,N,1.0,-74.0,40.677818,-73.0,40.685101,...,6.5,0.0,0.5,1.46,0.00,,0.3,8.76,1.0,1.0
154,155.0,1.0,03/24/2015 11:03:32 AM,03/24/2015 11:24:17 AM,N,1.0,-73.0,40.616379,-74.0,40.626282,...,17.0,0.0,0.5,0.00,0.00,,0.3,17.80,2.0,1.0


Wir haben jetzt ein Dataframe, welches alle Touren wo der 73 zum 74 Längengrad überschritten wurden enthält

In [34]:
df5.count().compute()

No.                      4497
vendorid                 4497
pickup_datetime          4497
dropoff_datetime         4497
Store_and_fwd_flag       4497
rate_code                4497
Pickup_longitude         4497
Pickup_latitude          4497
Dropoff_longitude        4497
Dropoff_latitude         4497
Passenger_count          4497
Trip_distance            4497
Fare_amount              4497
Extra                    4497
MTA_tax                  4497
Tip_amount               4497
Tolls_amount             4497
Ehail_fee                   0
Improvement_surcharge    4497
Total_amount             4497
Payment_type             4497
Trip_type                4497
dtype: int64

insgesamt 4497 Touren von 100.000 Touren

# Welche statistischen Werte gibt es?

des weiteren gibt es rolling values(rolling avg, rolling std ...) std, mean, Quantile ...

In [35]:
df = dd.read_csv(r"taxi_hunderttausend.csv",error_bad_lines=False)

Standardabweichung von Total_amount

In [36]:
std = df['Total_amount'].std().compute()
std

12.439756879724481

Rolling average aus jeweils immer 10 Zahlen gebildet

In [37]:
rolling = df['Total_amount'].rolling(10,5).mean().compute()
rolling.head(20)


0           NaN
1           NaN
2           NaN
3           NaN
4     14.600000
5     14.633333
6     13.880000
7     16.690000
8     17.142222
9     16.408000
10    17.808000
11    16.208000
12    15.040000
13    14.886000
14    14.306000
15    14.706000
16    14.400000
17    11.759000
18    10.798000
19    11.654000
Name: Total_amount, dtype: float64

Wie viele der bezahlten Beträge sind einzigartig?

In [38]:
x = df['Total_amount'].nunique()
x.compute()

2157

# weitere Features von Dask

Dask funktioniert auch auf großen Clustern und kann Grafiken zur Auslastung zeigen

In [39]:
%%HTML #2:12
<iframe width="560" height="315" src="https://www.youtube.com/embed/PAGjm4BMKlk?" frameborder="0" allowfullscreen></iframe>

In [41]:
%%html
<img src='dashboard-report-1.jpg'>