In [0]:
### Install if needed
# dbutils.library.installPyPI("koalas")
# dbutils.library.restartPython()

### Import Libraries / Style / Show Versions ###
import os
import pandas as pd
import pyspark
from pyspark.sql.types import *
import time
# import matplotlib
# import koalas as ks
                                              # Code erstellt mit:
!python --version                             # Python 3.7.5
print(pd.__version__)                         # 1.0.1
print(pyspark.__version__)                    # 3.0.0.dev0

In [0]:
%fs ls /FileStore/tables/

path,name,size
dbfs:/FileStore/tables/g10/,g10/,0
dbfs:/FileStore/tables/measures1.csv/,measures1.csv/,0
dbfs:/FileStore/tables/measures2.csv/,measures2.csv/,0
dbfs:/FileStore/tables/mivPq12/,mivPq12/,0
dbfs:/FileStore/tables/mivPq13/,mivPq13/,0
dbfs:/FileStore/tables/mivPq14/,mivPq14/,0
dbfs:/FileStore/tables/mivPq15/,mivPq15/,0
dbfs:/FileStore/tables/mivPq16/,mivPq16/,0
dbfs:/FileStore/tables/mivPq17/,mivPq17/,0
dbfs:/FileStore/tables/mivPq18/,mivPq18/,0


In [0]:
''' Um einzelne Tabellen im DBFS zu löschen:
dbutils.fs.rm("dbfs:/FileStore/tables/tabellenname.csv")
Um alle Tabellen unter dem Pfad zu löschen siehe nächste Zelle; Befehl muss alleine in einer Zelle stehen.
neues verzeichnis: dbutils.fs.mkdirs("dbfs:/FileStore/tables/")
''' 

In [0]:
''' Löscht ganzes (Sub-)Verzeichnis
%fs rm -r /FileStore/tables/
'''

In [0]:
### Variablen laden

# Webpages fuer Datensatz Motorisierter Individualverkehr von 2012 - 2020 (https://data.stadt-zuerich.ch/dataset/sid_dav_verkehrszaehlung_miv_od2031)

url_12 = "https://data.stadt-zuerich.ch/dataset/6212fd20-e816-4828-a67f-90f057f25ddb/resource/a0af24df-7b7f-4951-a08b-9391778c92d7/download/sid_dav_verkehrszaehlung_miv_od2031_2012.csv"
url_13 = "https://data.stadt-zuerich.ch/dataset/6212fd20-e816-4828-a67f-90f057f25ddb/resource/4b5aa585-a9f6-4cda-b626-4878d7c954bf/download/sid_dav_verkehrszaehlung_miv_od2031_2013.csv"
url_14 = "https://data.stadt-zuerich.ch/dataset/6212fd20-e816-4828-a67f-90f057f25ddb/resource/cdb43ead-a065-40a5-9565-56f6cfe95b5e/download/sid_dav_verkehrszaehlung_miv_od2031_2014.csv"
url_15 = "https://data.stadt-zuerich.ch/dataset/6212fd20-e816-4828-a67f-90f057f25ddb/resource/62457f7f-c247-4b87-beee-5677d98d7cf4/download/sid_dav_verkehrszaehlung_miv_od2031_2015.csv"
url_16 = "https://data.stadt-zuerich.ch/dataset/6212fd20-e816-4828-a67f-90f057f25ddb/resource/f454b4d5-7570-4547-8ee0-2bde785ec628/download/sid_dav_verkehrszaehlung_miv_od2031_2016.csv"
url_17 = "https://data.stadt-zuerich.ch/dataset/6212fd20-e816-4828-a67f-90f057f25ddb/resource/f873cc29-96ac-4b2f-b175-f733513e4012/download/sid_dav_verkehrszaehlung_miv_od2031_2017.csv"
url_18 = "https://data.stadt-zuerich.ch/dataset/6212fd20-e816-4828-a67f-90f057f25ddb/resource/d5963dee-7841-4e64-9268-6c850a2fc497/download/sid_dav_verkehrszaehlung_miv_od2031_2018.csv"
url_19 = "https://data.stadt-zuerich.ch/dataset/6212fd20-e816-4828-a67f-90f057f25ddb/resource/fa64fa70-6328-4d47-bcf0-1eff694d7c22/download/sid_dav_verkehrszaehlung_miv_od2031_2019.csv"
url_20 = "https://data.stadt-zuerich.ch/dataset/6212fd20-e816-4828-a67f-90f057f25ddb/resource/44607195-a2ad-4f9b-b6f1-d26c003d85a2/download/sid_dav_verkehrszaehlung_miv_od2031_2020.csv"

#### Laden und aggregieren via Pandas

In [0]:
### Daten via Panda laden 
'''
Haben wir gewählt weil wir in Spark keinen direkten CSV load ab einer URL gefunden haben
Problem: bei grösseren Datenmengen geht das nicht, weil alles über das Memory läuft. Alternative? 
Vielleicht Koalas?
Oder dann manuell alles in einen lokalen persistenten Speicher downloaden und dann portionenweise in Databricks hochladen
'''
miv_2012 = pd.read_csv(url_12, sep=",", header=0, index_col=0)
miv_2013 = pd.read_csv(url_13, sep=",", header=0, index_col=0)
miv_2014 = pd.read_csv(url_14, sep=",", header=0, index_col=0)

In [0]:
miv_2015 = pd.read_csv(url_15, sep=",", header=0, index_col=0)
miv_2016 = pd.read_csv(url_16, sep=",", header=0, index_col=0)
miv_2017 = pd.read_csv(url_17, sep=",", header=0, index_col=0)

In [0]:
miv_2018 = pd.read_csv(url_18, sep=",", header=0, index_col=0)
miv_2019 = pd.read_csv(url_19, sep=",", header=0, index_col=0)
miv_2020 = pd.read_csv(url_20, sep=",", header=0, index_col=0)

In [0]:
''' Pandas in ein csv wandeln (nur einmal laufen lassen)

miv12 = miv_2012.to_csv()
miv13 = miv_2013.to_csv()
miv14 = miv_2014.to_csv()
miv15 = miv_2015.to_csv()
miv16 = miv_2016.to_csv()
miv17 = miv_2017.to_csv()
miv18 = miv_2018.to_csv()
miv19 = miv_2019.to_csv()
miv20 = miv_2020.to_csv()
'''

In [0]:
''' Im DBFS abspeichern (nur einmal laufen lassen)
# Danach können die Daten dann via Spark/RDD verarbeitet werden.

dbutils.fs.put('/FileStore/tables/miv_2012.csv', miv12) 
dbutils.fs.put('/FileStore/tables/miv_2013.csv', miv13)
dbutils.fs.put('/FileStore/tables/miv_2014.csv', miv14)
dbutils.fs.put('/FileStore/tables/miv_2015.csv', miv15)
dbutils.fs.put('/FileStore/tables/miv_2016.csv', miv16)
dbutils.fs.put('/FileStore/tables/miv_2017.csv', miv17)
dbutils.fs.put('/FileStore/tables/miv_2018.csv', miv18)
dbutils.fs.put('/FileStore/tables/miv_2019.csv', miv19)
dbutils.fs.put('/FileStore/tables/miv_2020.csv', miv20) 
'''

In [0]:
# File mit 1 Jahr preparieren um 'save to DBFS' zu ermöglichen

miv_1 = miv_2012.to_csv()


In [0]:
# Auch hier: Gesamtaggregation sprengt das Memory wegen single node usage - Workaround 1 Jahr hochrechnen
# save_pd = 471.381

start = time.time()

dbutils.fs.put('/FileStore/tables/miv_1.csv', miv_1) 

# Zeitmessung aufzeichnen
stop = time.time()
save_pd = round((stop-start) * 9, 3)    # Linear hochrechnen auf 9 Jahre


In [0]:
# Aggregation der Files in Panda (single node approach)
# aggr_pd = 81.937

start = time.time()

def combine_file(raw_data1,raw_data2):
  raw_beide = pd.concat([raw_data1, raw_data2], axis = 0)
  return raw_beide

miv_pa_total = combine_file(miv_2012, miv_2013)
miv_pa_total = combine_file(miv_pa_total, miv_2014)
miv_pa_total = combine_file(miv_pa_total, miv_2015)
miv_pa_total = combine_file(miv_pa_total, miv_2016)
miv_pa_total = combine_file(miv_pa_total, miv_2017)
miv_pa_total = combine_file(miv_pa_total, miv_2018)
miv_pa_total = combine_file(miv_pa_total, miv_2019)
miv_pa_total = combine_file(miv_pa_total, miv_2020)

# Alternative Funktion wäre besser und schneller, aber für unseren Vergleich nehmen wir immer die gleiche obige Funktion.
# miv_pa_total = pd.concat([miv_2012, miv_2013, miv_2014, miv_2015, miv_2016, miv_2017, miv_2018, miv_2019, miv_2020], axis=0)

# Zeitmessung aufzeichnen
stop = time.time()
aggr_pd = round((stop-start), 3)


#### In Spark laden und aggregieren

In [0]:
# Daten in Spark laden - Community Edition pfad
# Saving to/from persistent tables --> spark.read.format

sp_miv12 = spark.read.format('csv') \
  .option("inferSchema", "false") \
  .option("header", "true") \
  .option("sep", ",") \
  .load('/FileStore/tables/miv_2019.csv')

sp_miv13 = spark.read.format('csv') \
  .option("inferSchema", "false") \
  .option("header", "true") \
  .option("sep", ",") \
  .load('/FileStore/tables/miv_2019.csv')

sp_miv14 = spark.read.format('csv') \
  .option("inferSchema", "false") \
  .option("header", "true") \
  .option("sep", ",") \
  .load('/FileStore/tables/miv_2019.csv')

sp_miv15 = spark.read.format('csv') \
  .option("inferSchema", "false") \
  .option("header", "true") \
  .option("sep", ",") \
  .load('/FileStore/tables/miv_2019.csv')

sp_miv16 = spark.read.format('csv') \
  .option("inferSchema", "false") \
  .option("header", "true") \
  .option("sep", ",") \
  .load('/FileStore/tables/miv_2019.csv')

sp_miv17 = spark.read.format('csv') \
  .option("inferSchema", "false") \
  .option("header", "true") \
  .option("sep", ",") \
  .load('/FileStore/tables/miv_2019.csv')

sp_miv18 = spark.read.format('csv') \
  .option("inferSchema", "false") \
  .option("header", "true") \
  .option("sep", ",") \
  .load('/FileStore/tables/miv_2019.csv')

sp_miv19 = spark.read.format('csv') \
  .option("inferSchema", "false") \
  .option("header", "true") \
  .option("sep", ",") \
  .load('/FileStore/tables/miv_2019.csv')

sp_miv20 = spark.read.format('csv') \
  .option("inferSchema", "false") \
  .option("header", "true") \
  .option("sep", ",") \
  .load('/FileStore/tables/miv_2020.csv')

In [0]:
# Aggregation der Files mit Spark Dataframes

start = time.time()

def combine_files(raw_data1, raw_data2):
  raw_beide = raw_data1.union(raw_data2)
  return(raw_beide)

miv_sp_total = combine_files(sp_miv12, sp_miv13)
miv_sp_total = combine_files(miv_sp_total, sp_miv14)
miv_sp_total = combine_files(miv_sp_total, sp_miv15)
miv_sp_total = combine_files(miv_sp_total, sp_miv16)
miv_sp_total = combine_files(miv_sp_total, sp_miv17)
miv_sp_total = combine_files(miv_sp_total, sp_miv18)
miv_sp_total = combine_files(miv_sp_total, sp_miv19)
miv_sp_total = combine_files(miv_sp_total, sp_miv20)

# Zeitmessung aufzeichnen
stop = time.time()
aggr_sp = round((stop-start), 3)

In [0]:
# Verzeichnis löschen um vergleichbare Zeitmessungen zu erhalten
dbutils.fs.rm('/FileStore/tables/miv_sp_total.csv', True)

In [0]:
start = time.time()

miv_sp_total.write.mode("overwrite").save("dbfs:/FileStore/tables/miv_sp_total.csv")

# Zeitmessung aufzeichnen
stop = time.time()
save_sp = round((stop-start), 3)


#### In Parquet laden und aggregieren

In [0]:
''' nur einmal laufen lassen
# Erstelle Parquet Files
# Apache Parquet is a columnar file format that provides optimizations to speed up queries and is a far more efficient file format than CSV or JSON.

dbutils.fs.rm('/FileStore/tables/mivPq12', True)  
dbutils.fs.rm('/FileStore/tables/mivPq13', True)  
dbutils.fs.rm('/FileStore/tables/mivPq14', True)  
dbutils.fs.rm('/FileStore/tables/mivPq15', True)  
dbutils.fs.rm('/FileStore/tables/mivPq16', True)  
dbutils.fs.rm('/FileStore/tables/mivPq17', True)  
dbutils.fs.rm('/FileStore/tables/mivPq18', True) 
dbutils.fs.rm('/FileStore/tables/mivPq19', True) 
dbutils.fs.rm('/FileStore/tables/mivPq20', True) 

sp_miv12.write.parquet("/FileStore/tables/mivPq12")
sp_miv13.write.parquet("/FileStore/tables/mivPq13")
sp_miv14.write.parquet("/FileStore/tables/mivPq14")
sp_miv15.write.parquet("/FileStore/tables/mivPq15")
sp_miv16.write.parquet("/FileStore/tables/mivPq16")
sp_miv17.write.parquet("/FileStore/tables/mivPq17")
sp_miv18.write.parquet("/FileStore/tables/mivPq18")
sp_miv19.write.parquet("/FileStore/tables/mivPq19")
sp_miv20.write.parquet("/FileStore/tables/mivPq20")  
'''

In [0]:
# Read from parquet file
# Return the contents of a file from a Databricks File System (DBFS)

pq_miv12 = spark.read.parquet("/FileStore/tables/mivPq12")
pq_miv13 = spark.read.parquet("/FileStore/tables/mivPq13")
pq_miv14 = spark.read.parquet("/FileStore/tables/mivPq14")
pq_miv15 = spark.read.parquet("/FileStore/tables/mivPq15")
pq_miv16 = spark.read.parquet("/FileStore/tables/mivPq16")
pq_miv17 = spark.read.parquet("/FileStore/tables/mivPq17")
pq_miv18 = spark.read.parquet("/FileStore/tables/mivPq18")
pq_miv19 = spark.read.parquet("/FileStore/tables/mivPq19")
pq_miv20 = spark.read.parquet("/FileStore/tables/mivPq20")

In [0]:
# Aggregation der Files mit Spark Parquet

start = time.time()

def combine_files(raw_data1, raw_data2):
  raw_beide = raw_data1.union(raw_data2)
  return(raw_beide)

miv_pq_total = combine_files(pq_miv12, pq_miv13)
miv_pq_total = combine_files(miv_pq_total, pq_miv14)
miv_pq_total = combine_files(miv_pq_total, pq_miv15)
miv_pq_total = combine_files(miv_pq_total, pq_miv16)
miv_pq_total = combine_files(miv_pq_total, pq_miv17)
miv_pq_total = combine_files(miv_pq_total, pq_miv18)
miv_pq_total = combine_files(miv_pq_total, pq_miv19)
miv_pq_total = combine_files(miv_pq_total, pq_miv20)

# Zeitmessung aufzeichnen
stop = time.time()
aggr_pq = round((stop-start), 3)


In [0]:
# Verzeichnis löschen um vergleichbare Zeitmessungen zu erhalten
dbutils.fs.rm('/FileStore/tables/mivPqTotal', True)  

In [0]:
start = time.time()

miv_pq_total.write.parquet("/FileStore/tables/mivPqTotal")  

# Zeitmessung aufzeichnen
stop = time.time()
save_pq = round((stop-start), 3)

In [0]:
# DataFrame df mit allen Zeiten
# aggr_pd und save_pd muss mit einem hochgerechneten Wert gefüllt werden wegen oben erwähntem Memory Problem

schema = StructType([StructField("ObjektFormat", StringType(), True),
                     StructField("t_aggregation", FloatType(), True),
                     StructField("a_factor", FloatType(), True),
                     StructField("t_speichern", FloatType(), True),
                     StructField("s_factor", FloatType(), True)])

dataset = [("Pandas", aggr_pd, 0.0, save_pd, 0.0), 
           ("SparkDF", aggr_sp, round(aggr_pd/aggr_sp, 0), save_sp, round(save_pd/save_sp, 2)), 
           ("Parquet", aggr_pq, round(aggr_pd/aggr_pq, 0), save_pq, round(save_pd/save_pq, 2))]
            
measures1 = spark.createDataFrame(dataset, schema)
measures1.display()

ObjektFormat,t_aggregation,a_factor,t_speichern,s_factor
Pandas,81.937,0.0,471.381,0.0
SparkDF,0.279,294.0,238.673,1.98
Parquet,0.086,953.0,127.643,3.69


In [0]:
dbutils.fs.rm('/FileStore/tables/measures1.csv', True) 
measures1.write.parquet("dbfs:/FileStore/tables/measures1")

#### Zeitmessungen einer Select Funktion auf den 3 verschiedenen Aggregationen

In [0]:
miv_pa_total = pd.concat([miv_2012, miv_2013, miv_2014, miv_2015, miv_2016, miv_2017, miv_2018, miv_2019, miv_2020], axis=0)

In [0]:
# Basis der Messung ist die Aggregation auf Pandas

start = time.time()

# Äquivalent zu ("SELECT AnzFahrzeuge FROM pandas WHERE AnzFahrzeuge <= 200 AND (Richtung = 'Bucheggplatz' OR Achse = 'Rosengartenstrasse')")
query_result = miv_pa_total.loc[(miv_pa_total['AnzFahrzeuge'] <= 200) & ((miv_pa_total['Richtung'] == 'Bucheggplatz') | (miv_pa_total['Achse'] == 'Rosengartenstrasse'))]
query_result.head(5)

stop = time.time()
query_pd = round((stop-start), 3)


In [0]:
start = time.time()
query_result.ZSID.count()
stop = time.time()
count_pd = round((stop-start), 3)

In [0]:
# Messung der Funktionsdauer auf Spark Dataframes

start = time.time()

miv_sp_total.createOrReplaceTempView("spark_df")
query_result = spark.sql("SELECT AnzFahrzeuge FROM spark_df WHERE AnzFahrzeuge <= 200 AND (Richtung = 'Bucheggplatz' OR Achse = 'Rosengartenstrasse')")
query_result.show(5)

stop = time.time()
query_sp = round((stop-start), 3)


In [0]:
start = time.time()
query_result.count()
stop = time.time()
count_sp = round((stop-start), 3)

In [0]:
# Messung der Funktionsdauer auf Spark Parquet

start = time.time()

miv_pq_total.createOrReplaceTempView("parquet_df")
query_result = spark.sql("SELECT AnzFahrzeuge FROM parquet_df WHERE AnzFahrzeuge <= 200 AND (Richtung = 'Bucheggplatz' OR Achse = 'Rosengartenstrasse')")
query_result.show(5)

stop = time.time()
query_pq = round((stop-start), 3)


In [0]:
start = time.time()
query_result.count()
stop = time.time()
count_pq = round((stop-start), 3)

In [0]:
# Zweites DataFrame mit allen Zeiten

schema2 = StructType([StructField("Objekt Format", StringType(), True),
                     StructField("t_select", FloatType(), True),
                     StructField("t_count", FloatType(), True)])

dataset2 = [("Pandas", query_pd, count_pd), 
           ("SparkDF", query_sp, count_sp), 
           ("Parquet", query_pq, count_pq)]
            
measures2 = spark.createDataFrame(dataset2, schema2)
measures2.display()

In [0]:
dbutils.fs.rm('/FileStore/tables/measures2.csv', True) 
measures2.write.mode("overwrite").save("dbfs:/FileStore/tables/measures2.csv")

#### Tests der Skalen-Effekte auf in den 3 Formaten

In [0]:
# Basis bildet wieder das Pandas 
# dazu werden 3 verschieden grosse Files erstellt

pd_small = miv_2012
pd_med = pd.concat([miv_2012, miv_2013, miv_2014], axis=0)
pd_large = pd.concat([miv_2012, miv_2013, miv_2014, miv_2015, miv_2016, miv_2017, miv_2018, miv_2019, miv_2020], axis=0)

In [0]:
''' only used as workaround to reduce memory usage
miv_2012 = [1]
miv_2013 = [1]
miv_2014 = [1]
miv_2015 = [1]
miv_2016 = [1]
miv_2017 = [1]
miv_2018 = [1]
miv_2019 = [1]
miv_2020 = [1]
pd_med = [1]
'''

In [0]:
# Zeitmessung auf Pandas

start = time.time()

pd_small_r = pd_small.loc[(pd_small['AnzFahrzeuge'] <= 200) & ((pd_small['Richtung'] == 'Bucheggplatz') | (pd_small['Achse'] == 'Rosengartenstrasse'))]
size_small_pd = int(pd_small.ZSID.count())

stop = time.time()
t_small_pd = round((stop-start), 3)
tr_small_pd = round(t_small_pd / size_small_pd * 1000000, 3)

In [0]:
start = time.time()

pd_med_r = pd_med.loc[(pd_med['AnzFahrzeuge'] <= 200) & ((pd_med['Richtung'] == 'Bucheggplatz') | (pd_med['Achse'] == 'Rosengartenstrasse'))]
size_med_pd = int(pd_med.ZSID.count())

stop = time.time()
t_med_pd = round((stop-start), 3)
tr_med_pd = round(t_med_pd / size_med_pd * 1000000, 3)

In [0]:
# die Berechung auf dem grossen File braucht zu viel Memory - kann so nicht ausgeführt werden

start = time.time()

pd_large_r = pd_large.loc[(pd_large['AnzFahrzeuge'] <= 200) & ((pd_large['Richtung'] == 'Bucheggplatz') | (pd_large['Achse'] == 'Rosengartenstrasse'))]
size_large_pd = int(pd_large.ZSID.count())

stop = time.time()
t_large_pd = round((stop-start), 3)
tr_large_pd = round(t_large_pd / size_large_pd * 1000000, 3)


In [0]:
'''# workaround wegen memory usage (Hochrechnung der Zeit)
size_large_pd = int(pd_large.ZSID.count())
t_large_pd = 300.333
tr_large_pd = round(t_large_pd / size_large_pd * 1000000, 3)
'''

In [0]:
# 3 verschieden grosse Spark Dataframes erstellen

sp_small = sp_miv12


def combine_files(raw_data1, raw_data2):
  raw_beide = raw_data1.union(raw_data2)
  return(raw_beide)

sp_med = combine_files(sp_miv12, sp_miv13)
sp_med = combine_files(sp_med, sp_miv14)

sp_large = combine_files(sp_miv12, sp_miv13)
sp_large = combine_files(sp_large, sp_miv14)
sp_large = combine_files(sp_large, sp_miv15)
sp_large = combine_files(sp_large, sp_miv16)
sp_large = combine_files(sp_large, sp_miv17)
sp_large = combine_files(sp_large, sp_miv18)
sp_large = combine_files(sp_large, sp_miv19)
sp_large = combine_files(sp_large, sp_miv20)


In [0]:
# Messung der Funktionsdauer auf Spark Dataframes

start = time.time()

sp_small.createOrReplaceTempView("spark_df")
sp_small_r = spark.sql("SELECT AnzFahrzeuge FROM spark_df WHERE AnzFahrzeuge <= 200 AND (Richtung = 'Bucheggplatz' OR Achse = 'Rosengartenstrasse')")
size_small_sp = sp_small.count()

stop = time.time()
t_small_sp = round((stop-start), 3)
tr_small_sp = round(t_small_sp / size_small_sp * 1000000, 3)

In [0]:
start = time.time()

sp_med.createOrReplaceTempView("spark_df")
sp_med_r = spark.sql("SELECT AnzFahrzeuge FROM spark_df WHERE AnzFahrzeuge <= 200 AND (Richtung = 'Bucheggplatz' OR Achse = 'Rosengartenstrasse')")
size_med_sp = sp_med.count()

stop = time.time()
t_med_sp = round((stop-start), 3)
tr_med_sp = round(t_med_sp / size_med_sp * 1000000, 3)

In [0]:
start = time.time()

sp_large.createOrReplaceTempView("spark_df")
sp_large_r = spark.sql("SELECT AnzFahrzeuge FROM spark_df WHERE AnzFahrzeuge <= 200 AND (Richtung = 'Bucheggplatz' OR Achse = 'Rosengartenstrasse')")
size_large_sp = sp_large.count()

stop = time.time()
t_large_sp = round((stop-start), 3)
tr_large_sp = round(t_large_sp / size_large_sp * 1000000, 3)

In [0]:
'''### Parquet laden wenn nötig

pq_miv12 = spark.read.parquet("/FileStore/tables/mivPq12")
pq_miv13 = spark.read.parquet("/FileStore/tables/mivPq13")
pq_miv14 = spark.read.parquet("/FileStore/tables/mivPq14")
pq_miv15 = spark.read.parquet("/FileStore/tables/mivPq15")
pq_miv16 = spark.read.parquet("/FileStore/tables/mivPq16")
pq_miv17 = spark.read.parquet("/FileStore/tables/mivPq17")
pq_miv18 = spark.read.parquet("/FileStore/tables/mivPq18")
pq_miv19 = spark.read.parquet("/FileStore/tables/mivPq19")
pq_miv20 = spark.read.parquet("/FileStore/tables/mivPq20")
'''

In [0]:
pq_small = pq_miv12

def combine_files(raw_data1, raw_data2):
  raw_beide = raw_data1.union(raw_data2)
  return(raw_beide)

pq_med = combine_files(pq_miv12, pq_miv13)
pq_med = combine_files(pq_med, pq_miv14)

pq_large = combine_files(pq_miv12, pq_miv13)
pq_large = combine_files(pq_large, pq_miv14)
pq_large = combine_files(pq_large, pq_miv15)
pq_large = combine_files(pq_large, pq_miv16)
pq_large = combine_files(pq_large, pq_miv17)
pq_large = combine_files(pq_large, pq_miv18)
pq_large = combine_files(pq_large, pq_miv19)
pq_large = combine_files(pq_large, pq_miv20)


In [0]:
# Messung der Funktionsdauer auf Parquet

start = time.time()

pq_small.createOrReplaceTempView("parquet")
pq_small_r = spark.sql("SELECT AnzFahrzeuge FROM parquet WHERE AnzFahrzeuge <= 200 AND (Richtung = 'Bucheggplatz' OR Achse = 'Rosengartenstrasse')")
size_small_pq = pq_small.count()

stop = time.time()
t_small_pq = round((stop-start), 3)
tr_small_pq = round(t_small_pq / size_small_pq * 1000000, 3)

In [0]:
start = time.time()

pq_med.createOrReplaceTempView("parquet")
pq_med_r = spark.sql("SELECT AnzFahrzeuge FROM parquet WHERE AnzFahrzeuge <= 200 AND (Richtung = 'Bucheggplatz' OR Achse = 'Rosengartenstrasse')")
size_med_pq = pq_med.count()

stop = time.time()
t_med_pq = round((stop-start), 3)
tr_med_pq = round(t_med_pq / size_med_pq * 1000000, 3)

In [0]:
start = time.time()

pq_large.createOrReplaceTempView("parquet")
pq_large_r = spark.sql("SELECT AnzFahrzeuge FROM parquet WHERE AnzFahrzeuge <= 200 AND (Richtung = 'Bucheggplatz' OR Achse = 'Rosengartenstrasse')")
size_large_pq = pq_large.count()

stop = time.time()
t_large_pq = round((stop-start), 3)
tr_large_pq = round(t_large_pq / size_large_pq * 1000000, 3)

In [0]:
# Drittes DataFrame mit allen Zeiten

schema3 = StructType([StructField("Objekt Format", StringType(), True),
                     StructField("rows", IntegerType(), True),
                     StructField("t_select", FloatType(), True),
                     StructField("t_average", FloatType(), True)])

dataset3 = [("Small Pandas", size_small_pd, t_small_pd, tr_small_pd), 
           ("Medium Pandas", size_med_pd, t_med_pd, tr_med_pd), 
           ("Large Pandas", size_large_pd, t_large_pd, tr_large_pd),
           ("Small DataFrame", size_small_sp, t_small_sp, tr_small_sp), 
           ("Medium DataFrame", size_med_sp, t_med_sp, tr_med_sp), 
           ("Large DataFrame", size_large_sp, t_large_sp, tr_large_sp),
           ("Small Parquet", size_small_pq, t_small_pq, tr_small_pq), 
           ("Medium Parquet", size_med_pq, t_med_pq, tr_med_pq), 
           ("Large Parquet", size_large_pq, t_large_pq, tr_large_pq)]

measures3 = spark.createDataFrame(dataset3, schema3)
measures3.display()

Objekt Format,rows,t_select,t_average
Small Pandas,1414224,0.242,0.171
Medium Pandas,4229160,3.181,0.752
Large Pandas,13484616,19.105,1.417
Small DataFrame,1579416,10.86,6.876
Medium DataFrame,4738248,21.679,4.575
Large DataFrame,14225280,65.151,4.58
Small Parquet,1579416,3.597,2.277
Medium Parquet,4738248,3.167,0.668
Large Parquet,14225280,7.274,0.511


In [0]:
dbutils.fs.rm('/FileStore/tables/measures3.csv', True) 
measures3.write.mode("overwrite").save("dbfs:/FileStore/tables/measures3.csv")

In [0]:
''' ################# Inspect Data #################
miv_pq_total.dtypes 
len(miv_pq_total.dtypes)
miv_pq_total.count()
miv_pq_total.show()
miv_pq_total.first()
miv_pq_total.take(3)
miv_pq_total.schema
miv_pq_total.explain()          # shows all the data sets
miv_pq_total.printSchema()
miv_pq_total.distinct().count() # Command took 2.50 minutes: Out[130]: 3169368
'''