In [1]:
import findspark
findspark.init()
findspark.find()

'C:\\Users\\Admin\\anaconda3\\envs\\SparkEnvironment\\Lib\\site-packages\\pyspark'

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = (
    SparkSession
    .builder
    .appName("EindowOperationApp")
    .master("local[4]")
    .config("spark.dynamicAloocation.enabled", "false")
    .config("spark.sql.adaptive", "false")
    .getOrCreate()
)

sc = spark.sparkContext

spark

In [3]:
# Create schema for Yellow Taxi Data
 
taxiSchema = (
                    StructType
                    ([ 
                        StructField("VendorId"               , IntegerType()   , True),
                        StructField("lpep_pickup_datetime"   , TimestampType() , True),
                        StructField("lpep_dropoff_datetime"  , TimestampType() , True),                            
                        StructField("passenger_count"        , DoubleType()    , True),
                        StructField("trip_distance"          , DoubleType()    , True),
                        StructField("RatecodeID"             , DoubleType()    , True),                            
                        StructField("store_and_fwd_flag"     , StringType()    , True),
                        StructField("PULocationID"           , IntegerType()   , True),
                        StructField("DOLocationID"           , IntegerType()   , True),                            
                        StructField("payment_type"           , IntegerType()   , True),                            
                        StructField("fare_amount"            , DoubleType()    , True),
                        StructField("extra"                  , DoubleType()    , True),
                        StructField("mta_tax"                , DoubleType()    , True),
                        StructField("tip_amount"             , DoubleType()    , True),
                        StructField("tolls_amount"           , DoubleType()    , True),
                        StructField("improvement_surcharge"  , DoubleType()    , True),
                        StructField("total_amount"           , DoubleType()    , True),
                        StructField("congestion_surcharge"   , DoubleType()    , True),
                        StructField("airport_fee"            , DoubleType()    , True)
                    ])
               )


In [4]:
yellowTaxiDf = (
    spark
    .read
    .option("header", "true")
    .schema(taxiSchema)
    .csv("C:\DataFiles\YellowTaxis_202210.csv")
)
yellowTaxiDf.printSchema()

root
 |-- VendorId: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [9]:
yellowTaxiDf.createOrReplaceTempView("yellowTaxis")

In [6]:
taxiZoneSchema = "LocationID INT, Borough STRING, Zone STRING, serviceZone STRING"

taxiZonesDf = (
    spark
    .read
    .schema(taxiZoneSchema)
    .csv("C:\DataFiles\TaxiZones.csv")
)

taxiZonesDf.createOrReplaceTempView("TaxiZones")
taxiZonesDf.show(200)

+----------+-------------+--------------------+-----------+
|LocationID|      Borough|                Zone|serviceZone|
+----------+-------------+--------------------+-----------+
|         1|          EWR|      Newark Airport|        EWR|
|         2|       Queens|         Jamaica Bay|  Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|  Boro Zone|
|         4|    Manhattan|       Alphabet City|Yellow Zone|
|         5|Staten Island|       Arden Heights|  Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|  Boro Zone|
|         7|       Queens|             Astoria|  Boro Zone|
|         8|       Queens|        Astoria Park|  Boro Zone|
|         9|       Queens|          Auburndale|  Boro Zone|
|        10|       Queens|        Baisley Park|  Boro Zone|
|        11|     Brooklyn|          Bath Beach|  Boro Zone|
|        12|    Manhattan|        Battery Park|Yellow Zone|
|        13|    Manhattan|   Battery Park City|Yellow Zone|
|        14|     Brooklyn|           Bay

In [13]:
#Get rides for each borough

taxiRidesDf = (
    spark.sql("""
    select tz.Borough, count(*) as RidesCount
    from TaxiZones tz
    inner join yellowTaxis yt on yt.PULocationID = tz.LocationID
    group by tz.Borough
    """)
)

taxiRidesDf.createOrReplaceTempView("TaxisRides")
taxiRidesDf.orderBy("Borough").show()

+-------------+----------+
|      Borough|RidesCount|
+-------------+----------+
|        Bronx|      4511|
|     Brooklyn|     28089|
|          EWR|      1157|
|    Manhattan|   3250695|
|       Queens|    333922|
|Staten Island|       303|
|      Unknown|     56735|
+-------------+----------+



In [15]:
#All rides across all Borough

taxiRidesWindowDf = (
    spark.sql("""
    select * , sum(RidesCount) over() as TotalRideCount from TaxisRides
    """)
)
taxiRidesWindowDf.orderBy("Borough").show()
taxiRidesWindowDf.createOrReplaceTempView("TaxisRideWindow")

+-------------+----------+--------------+
|      Borough|RidesCount|TotalRideCount|
+-------------+----------+--------------+
|        Bronx|      4511|       3675412|
|     Brooklyn|     28089|       3675412|
|          EWR|      1157|       3675412|
|    Manhattan|   3250695|       3675412|
|       Queens|    333922|       3675412|
|Staten Island|       303|       3675412|
|      Unknown|     56735|       3675412|
+-------------+----------+--------------+



In [16]:
#Share of each each borough in terms of rides

( spark.sql("""
select *, round(RidesCount * 100 )/TotalRideCount as RideSharePercent from TaxisRideWindow 
""")
).show()

+-------------+----------+--------------+--------------------+
|      Borough|RidesCount|TotalRideCount|    RideSharePercent|
+-------------+----------+--------------+--------------------+
|       Queens|    333922|       3675412|   9.085294383323557|
|          EWR|      1157|       3675412| 0.03147946407096674|
|      Unknown|     56735|       3675412|  1.5436364685101969|
|     Brooklyn|     28089|       3675412|  0.7642408524540922|
|Staten Island|       303|       3675412|0.008243973736821885|
|    Manhattan|   3250695|       3675412|   88.44437031821194|
|        Bronx|      4511|       3675412| 0.12273453969242087|
+-------------+----------+--------------+--------------------+



In [19]:
# Rides for zone inside Borough

taxiRidesDf = (
    spark.sql("""
    select tz.Borough, tz.Zone, count(*) as RideCount from TaxiZones tz inner join yellowTaxis yt 
    on yt.PULocationID = tz.LocationID
    group by tz.Borough, tz.Zone
    """)
)
taxiRidesDf.orderBy("Borough", "Zone").show()
taxiRidesDf.createOrReplaceTempView("TaxiRides")

+-------+--------------------+---------+
|Borough|                Zone|RideCount|
+-------+--------------------+---------+
|  Bronx|Allerton/Pelham G...|       51|
|  Bronx|        Bedford Park|       92|
|  Bronx|             Belmont|       59|
|  Bronx|          Bronx Park|       22|
|  Bronx|           Bronxdale|       48|
|  Bronx|         City Island|       11|
|  Bronx|  Claremont/Bathgate|       98|
|  Bronx|          Co-Op City|      200|
|  Bronx|        Country Club|        7|
|  Bronx|        Crotona Park|        2|
|  Bronx|   Crotona Park East|       45|
|  Bronx|East Concourse/Co...|      180|
|  Bronx|        East Tremont|      113|
|  Bronx|         Eastchester|       73|
|  Bronx|       Fordham South|       52|
|  Bronx|          Highbridge|      158|
|  Bronx|         Hunts Point|       67|
|  Bronx| Kingsbridge Heights|      116|
|  Bronx|            Longwood|       41|
|  Bronx|       Melrose South|      173|
+-------+--------------------+---------+
only showing top

In [25]:
#Calucalte total rides in each borough

taxiRidesWindowDf = (
    spark.sql("""
    select * , sum(RideCount) over(partition by borough) as TotalRidesInEachBorough from TaxiRides 
    """)
)

taxiRidesWindowDf.orderBy("Borough", "Zone").show(1000, truncate = False)
taxiRidesWindowDf.createOrReplaceTempView("TaxiRidesWindow")

+-------------+---------------------------------------------+---------+-----------------------+
|Borough      |Zone                                         |RideCount|TotalRidesInEachBorough|
+-------------+---------------------------------------------+---------+-----------------------+
|Bronx        |Allerton/Pelham Gardens                      |51       |4511                   |
|Bronx        |Bedford Park                                 |92       |4511                   |
|Bronx        |Belmont                                      |59       |4511                   |
|Bronx        |Bronx Park                                   |22       |4511                   |
|Bronx        |Bronxdale                                    |48       |4511                   |
|Bronx        |City Island                                  |11       |4511                   |
|Bronx        |Claremont/Bathgate                           |98       |4511                   |
|Bronx        |Co-Op City               

In [28]:
#Share of each borough
(
    spark.sql("""
    select * , round(RideCount*100)/TotalRidesInEachBorough as RidesShareInEachBorough from TaxiRidesWindow
    """).show(1000, truncate = False)
)

+-------------+---------------------------------------------+---------+-----------------------+-----------------------+
|Borough      |Zone                                         |RideCount|TotalRidesInEachBorough|RidesShareInEachBorough|
+-------------+---------------------------------------------+---------+-----------------------+-----------------------+
|Bronx        |Soundview/Bruckner                           |58       |4511                   |1.2857459543338505     |
|Bronx        |Eastchester                                  |73       |4511                   |1.618266459765019      |
|Bronx        |Woodlawn/Wakefield                           |90       |4511                   |1.9951230325870095     |
|Bronx        |West Concourse                               |407      |4511                   |9.022389714032366      |
|Bronx        |Crotona Park East                            |45       |4511                   |0.9975615162935048     |
|Bronx        |Morrisania/Melrose       

In [31]:
productDf = (
    spark
    .read
    .option("header", "true")
    .csv("C:\\DataFiles\\new-data\\products\\productRevenue.csv")
)
productDf.printSchema()

root
 |-- product: string (nullable = true)
 |-- category: string (nullable = true)
 |-- revenue: string (nullable = true)



In [32]:
productDf.createOrReplaceTempView("product")

In [33]:
productDf.show()

+----------+----------+-------+
|   product|  category|revenue|
+----------+----------+-------+
|      Thin|Cell Phone|   6000|
|    Normal|    Tablet|   1500|
|      Mini|    Tablet|   5500|
|Ultra Thin|Cell Phone|   5000|
| Very Thin|Cell Phone|   6000|
|       Big|    Tablet|   2500|
|  Bendable|Cell Phone|   3000|
|  Foldable|Cell Phone|   3000|
|       Pro|    Tablet|   4500|
|      Pro2|    Tablet|   6500|
+----------+----------+-------+



In [41]:
productBestSell = (
    spark.sql("""
    select * , rank() over (partition by category order by revenue) as rank from product
    """)
).filter(col("rank")<=2)
productBestSell.createOrReplaceTempView("productBestSell")
productBestSell.show()

+--------+----------+-------+----+
| product|  category|revenue|rank|
+--------+----------+-------+----+
|Bendable|Cell Phone|   3000|   1|
|Foldable|Cell Phone|   3000|   1|
|  Normal|    Tablet|   1500|   1|
|     Big|    Tablet|   2500|   2|
+--------+----------+-------+----+



In [47]:
productDiff = (
    spark.sql("""
    select * , max(revenue) over(partition by category order by revenue desc) - revenue as difference from product
    """).show()
)

+----------+----------+-------+----------+
|   product|  category|revenue|difference|
+----------+----------+-------+----------+
|      Thin|Cell Phone|   6000|       0.0|
| Very Thin|Cell Phone|   6000|       0.0|
|Ultra Thin|Cell Phone|   5000|    1000.0|
|  Bendable|Cell Phone|   3000|    3000.0|
|  Foldable|Cell Phone|   3000|    3000.0|
|      Pro2|    Tablet|   6500|       0.0|
|      Mini|    Tablet|   5500|    1000.0|
|       Pro|    Tablet|   4500|    2000.0|
|       Big|    Tablet|   2500|    4000.0|
|    Normal|    Tablet|   1500|    5000.0|
+----------+----------+-------+----------+

