In [24]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
import pandas as pd

In [2]:
spark=SparkSession.builder.\
      master("local[*]").\
      appName("sparkRdd").\
     getOrCreate()

23/04/04 22:19:29 WARN Utils: Your hostname, smarteez-HP-ProBook-440-G8-Notebook-PC resolves to a loopback address: 127.0.1.1; using 192.168.1.12 instead (on interface wlp0s20f3)
23/04/04 22:19:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/04 22:19:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
green_schema=types.StructType([\
    types.StructField('VendorID', types.IntegerType(), True),\
    types.StructField('lpep_pickup_datetime', types.TimestampType(), True),\
    types.StructField('lpep_dropoff_datetime', types.TimestampType(), True),\
    types.StructField('store_and_fwd_flag', types.StringType(), True),\
    types.StructField('RatecodeID', types.IntegerType(), True),\
    types.StructField('PULocationID', types.IntegerType(), True),\
    types.StructField('DOLocationID', types.IntegerType(), True),\
    types.StructField('passenger_count', types.IntegerType(), True),\
    types.StructField('trip_distance', types.DoubleType(), True),\
    types.StructField('fare_amount', types.DoubleType(), True),\
    types.StructField('extra', types.DoubleType(), True),\
    types.StructField('mta_tax', types.DoubleType(), True),\
    types.StructField('tip_amount', types.DoubleType(), True),\
    types.StructField('tolls_amount', types.DoubleType(), True),\
    types.StructField('ehail_fee', types.DoubleType(), True),\
    types.StructField('improvement_surcharge', types.DoubleType(), True),\
    types.StructField('total_amount', types.DoubleType(), True),\
    types.StructField('payment_type', types.IntegerType(), True),\
    types.StructField('trip_type', types.IntegerType(), True),\
    types.StructField('congestion_surcharge', types.DoubleType(), True)])

In [4]:
df_green=spark.read.option('header','true').schema(green_schema).csv("data/raw/green/2020/*/*")

```
  SELECT
    date_trunc("hour",pickup_datetime) As hour,
    PULocationID as zone,
    SUM(total_amount) as green_ammounts,
    COUNT(1) as green_number_records
    
    FROM green
    
    WHERE pickup_datetime >= "2020-01-01 00:00:00"
    GROUP BY 1,2
    ORDER BY 3 DESC```

In [31]:
rdd_green=df_green.select("lpep_pickup_datetime","PULocationID","total_amount").rdd

In [6]:
from datetime import datetime

In [7]:
start=datetime(2020,1,1)
def filteroutliers(row):
    return row.lpep_pickup_datetime>start

In [8]:
def transform(row):
    return ((row.lpep_pickup_datetime.replace(minute=0,second=0,microsecond=0),row.PULocationID),(row.total_amount,1))

In [9]:
def reduce(left_value,right_value):
    left_value_amount,left_value_count=left_value
    right_value_amount,right_value_count=right_value
    output_ammount=left_value_amount+right_value_amount
    output_count=left_value_count+right_value_count
    return (output_ammount,output_count)

In [10]:
from collections import namedtuple

In [11]:
revenue_tuple=namedtuple('revenue_row',['hour','zone','revenue','count'])

In [12]:
def unwrap(row):
    return revenue_tuple(hour=row[0][0],zone=row[0][1],revenue=row[1][0],count=row[1][1])

In [13]:
schema=types.StructType([
    types.StructField('hour',   types.TimestampType(), True),
    types.StructField('zone',   types.LongType(), True), 
    types.StructField('revenue',types.DoubleType(), True), 
    types.StructField('count',  types.LongType(), True)])

In [14]:
rdd_to_df=rdd.filter(filteroutliers).map(transform).reduceByKey(reduce).map(unwrap).toDF(schema)

In [23]:
rdd_to_df.show()

+-------------------+----+------------------+-----+
|               hour|zone|           revenue|count|
+-------------------+----+------------------+-----+
|2020-01-01 00:00:00|  35|            129.96|    5|
|2020-01-01 00:00:00| 256|296.62000000000006|   13|
|2020-01-01 00:00:00| 255|            666.34|   28|
|2020-01-01 00:00:00|  24|              87.6|    3|
|2020-01-01 00:00:00| 244|183.57999999999998|   12|
|2020-01-01 01:00:00| 244|255.85000000000002|   20|
|2020-01-01 00:00:00| 146|             99.37|    6|
|2020-01-01 00:00:00| 193|               8.3|    1|
|2020-01-01 00:00:00| 157|             52.86|    2|
|2020-01-01 01:00:00|  75|292.89000000000004|   21|
|2020-01-01 00:00:00| 177|            274.95|   10|
|2020-01-01 01:00:00|  41| 819.4999999999998|   61|
|2020-01-01 00:00:00| 198|            195.11|    5|
|2020-01-01 00:00:00| 254| 73.82000000000001|    3|
|2020-01-01 01:00:00| 264|             71.96|    4|
|2020-01-01 00:00:00| 106|             10.56|    1|
|2020-01-01 

In [40]:
def apply_some_stuff(partition):
    
    df=pd.DataFrame(partition,columns=["lpep_pickup_datetime","PULocationID","total_amount"])
    for row in df.itertuples():
        yield row

In [47]:
rdd_green.mapPartitions(apply_some_stuff).toDF().drop('Index').select('total_amount').show(10)

[Stage 25:>                                                         (0 + 1) / 1]

+------------+
|total_amount|
+------------+
|        4.81|
|       24.36|
|       15.34|
|       25.05|
|        11.3|
|        14.8|
|        12.3|
|        21.8|
|         6.8|
|         6.8|
+------------+
only showing top 10 rows



                                                                                