# Koalas(pyspark) Benchmark against Pandas
## Data Collection
### Dependencies

In [None]:
#!pip install pyspark
#!pip install "pyarrow>=4.0.0" --prefer-binary
#!pip install koalas

In [None]:
%lsmagic

In [1]:
import urllib.request
import os
import pandas as pd
import glob
import numpy as np
import time
import logging
import argparse
import pyarrow as pa
import pyarrow.parquet as pq
import os
import pyspark
import timeit
from contextlib import contextmanager
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains




### Taxi Dataset
https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page

# define the folder you want to download the data to
output_path = "C:\\taxi"

# The most recent month for which data is available.  (Check the web site: linked above to see if new data has been made available.)
max_month = "2009-12"

all_months = []
for y in range(2000, 2050):
  for m in range(1, 13):
    month = "{0}-{1:02d}".format(y, m)
    all_months.append(month)

#Define the specific taxi Dataset you want and replace the minimum date as needed
taxi_types = {
  'yellow': ("2009-01", max_month)#,
  #'green':  ("2013-08", max_month),
  #'fhv':    ("2015-01", max_month)
}

for k in taxi_types.keys():
  print("Processing \"{0}\"  ({1}  through  {2})".format(k, taxi_types[k][0], taxi_types[k][1]))
  months = [x for x in all_months if x >= taxi_types[k][0] and x <= taxi_types[k][1]]
  
  type_path = "{0}/{1}".format(output_path, k)
  if not os.path.exists(type_path):
    os.makedirs(type_path)

    #Searches the S3 Bucked on AWS the data is stored in
  for m in months:
    url = "https://s3.amazonaws.com/nyc-tlc/trip+data/{0}_tripdata_{1}.csv".format(k, m)
    filename = "{0}/{1}.csv".format(type_path, m)

    # Do not download the file if we already have a copy of it
    if not os.path.exists(filename):
      urllib.request.urlretrieve(url, filename)
      print("   Downloaded {}".format(m))
    else:
      print("   Skipped {} (file already exists)".format(m))
    
  print("============================================================")

# A Look at Pandas

In [9]:
%cd

[WinError 123] The filename, directory name, or volume label syntax is incorrect: 'home C:\\taxi\\'
E:\Users\Sven Maso


In [None]:
#we try to concat all the csv's we downloaded before to test read/write
start_time = timeit.default_timer()
all_files = glob.glob("E:/Users/Sven Maso/glob/*.csv")
df = pd.concat((pd.read_csv(f) for f in all_files))
print(df)
elapsed = timeit.default_timer() - start_time

In [None]:
#we check how long the Cell took to compute
elapsed

In [None]:
df

start_time2 = timeit.default_timer()
frame.to_csv("example_csv.csv")
elapsed2 = timeit.default_timer() - start_time

In [None]:
elapsed2.show.py

# A look at Pyspark

In [2]:
# Create SparkSession
# we build a lokal cluster, give our app a name and give it 4GB of ram to work with
spark = SparkSession.builder.master("local[1]") \
                    .appName('Pyspark_Benchmark') \
                    .config("spark.driver.memory", "4g")\
                    .getOrCreate()
# we build a lokal cluster, give our app a name and give it 4GB of ram to work with
sc=spark.sparkContext
#This defines our Sparkapp
SparkUI = spark.sparkContext.uiWebUrl
#This generates our SparkUI
print("Spark Version: " + sc.version)
print("PySpark Version: " + pyspark.__version__) 
print("Access SparkUI at: " + SparkUI)

Spark Version: 3.2.1
PySpark Version: 3.2.1
Access SparkUI at: http://DESKTOP-FMCAEPI.fritz.box:4040


In [4]:
#This defines on which schema our dataframe needs to be build
schema = StructType([
    StructField("vendor_id", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("pickup_longitude", DoubleType(), True),
    StructField("pickup_latitude", DoubleType(), True),
    StructField("rate_code_id", IntegerType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("dropoff_longitude", DoubleType(), True),
    StructField("dropoff_latitude", DoubleType(), True),
    StructField("payment_type", StringType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("total_amount", DoubleType(), True)])

In [None]:
start_time3 = timeit.default_timer()

#here we build our dataframe on certain values, under options we could autocreate our schema with inferschema, but it would compute twice as long
df = sc.read.csv("E:/Users/Sven Maso/glob/2009-01.csv",header=True)
      #.schema(yellowTripSchemaPre2015) \
      #.load("E:/Users/Sven Maso/glob/*.csv")
elapsed3 = timeit.default_timer() - start_time

In [None]:
df = spark.sparkContext.read.csv("E:/Users/Sven Maso/glob/2009-01.csv",header=True)

In [None]:
elapsed3

In [None]:
start_time4 = timeit.default_timer()
#we want to store our Dataframe as a Parquete file which is compressed and faster than a csv
df_with_schema.write.parquet("data.parquet")
elapsed4 = timeit.default_timer() - start_time

In [None]:
elapsed4

In [None]:
start_time5 = timeit.default_timer()
parDF1=spark.read.parquet("data.parquet")
elapsed5 = timeit.default_timer() - start_time

In [None]:
elapsed5

In [None]:
df_with_schema.show()

## plotting

In [None]:
import numpy as np
import matplotlib.pyplot as plt
 
 
# Creating dataset
AI = ['BERT (110M parameters)', 'Transformer (213M parameters)', 'ELMo','Transformer (65M parameters)' , 'Transformer (213M parameters) w/ neural architecture search']
 
data = [26, 192, 262, 1.438, 626.155]
 
 
# Creating explode data
explode = (0.15, 0.05, 0.0, 0.0, 0.02, )
 
# Creating color parameters
colors = ( "orange", "cyan", "brown",
          "grey", "indigo")
 
# Wedge properties
wp = { 'linewidth' : 1, 'edgecolor' : "black" }
 
# Creating autocpt arguments
def func(pct, allvalues):
    absolute = int(pct / 100.*np.sum(allvalues))
    return "{:.1f}%\n({:d} g)".format(pct, absolute)
 
# Creating plot
fig, ax = plt.subplots(figsize =(20, 14))
wedges, texts, autotexts = ax.pie(data,
                                  autopct = lambda pct: func(pct, data),
                                  explode = explode,
                                  labels = AI,
                                  shadow = True,
                                  colors = colors,
                                  startangle = 90,
                                  wedgeprops = wp,
                                  )
 
# Adding legend
ax.legend(wedges, AI,
          title ="AI Model",
          loc ="center right",
          bbox_to_anchor =(1, 0, 0.5, 2))
 
plt.setp(autotexts, size = 12, weight ="bold")
ax.set_title("The estimated Carbon footprint of training a Model in lbs")
 
# show plot
plt.show()

dict ={'AI Model':['Transformer (65M parameters)', 'Transformer (213M parameters)', 'ELMo', 'BERT (110M parameters)', 'Transformer (213M parameters) w/ neural architecture search', 'GPT-2'],	
                        'Date of original paper': ['Jun, 2017', 'Jun, 2017', 'Feb, 2018', 'Oct, 2018', 'Jan, 2019', 'Feb, 2019'],
                        'Energy consumption (kWh)': ['27', '201', '275', '1,507', '656,347', '0'],
                        'Carbon footprint (lbs of CO2e)': ['26', '192', '262', '1,438', '626,155', '0'],
                        'Cloud compute cost (USD)':['\$41-$140', '\$289-$98', '\$433-$1,472', '\$3,751-$12,571', '\$942,973-$3,201,722', '\$12,902-$43,008']
                    }
aiframe = pd.DataFrame(data=dict)
aiframe.set_index('AI Model')
aiframe