In [1]:
import os
os.environ['SPARK_HOME'] = r"C:\Spark\spark-3.5.0-bin-hadoop3"
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = 'python'

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
        .appName("PySpark hands-on") \
        .getOrCreate()

In [4]:
import pyspark
from pyspark.sql import SparkSession

In [5]:
data = [("Alice", 25), ("Bob", 30),  ("Charlie",40)]
df = spark.createDataFrame(data, ["Name","Age"])
df.show()

+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 40|
+-------+---+



In [6]:
from pyspark import SparkContext
sc=SparkContext.getOrCreate()

In [7]:
tesla_file  = 'TSLA.csv'
tesla_rdd = sc.textFile(tesla_file)
tesla_rdd.take(5)

['Date,Open,High,Low,Close,AdjClose,Volume',
 '2019-07-15,248.000000,254.419998,244.860001,253.500000,253.500000,11000100',
 '2019-07-16,249.300003,253.529999,247.929993,252.380005,252.380005,8149000',
 '2019-07-17,255.669998,258.309998,253.350006,254.860001,254.860001,9764700',
 '2019-07-18,255.050003,255.750000,251.889999,253.539993,253.539993,4764500']

In [8]:
type(tesla_rdd)

pyspark.rdd.RDD

In [9]:
tesla_csv_rdd = tesla_rdd.map(lambda row: row.split(","))
print(tesla_csv_rdd.take(2))
print(type(tesla_csv_rdd))

[['Date', 'Open', 'High', 'Low', 'Close', 'AdjClose', 'Volume'], ['2019-07-15', '248.000000', '254.419998', '244.860001', '253.500000', '253.500000', '11000100']]
<class 'pyspark.rdd.PipelinedRDD'>


In [10]:
len(tesla_csv_rdd.take(1)[0])

7

In [11]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)




In [12]:
tesla_df = sqlContext.read.load(tesla_file, 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true')

In [13]:
tesla_df.count()

253

In [14]:
tesla_df.take(10)

[Row(Date=datetime.date(2019, 7, 15), Open=248.0, High=254.419998, Low=244.860001, Close=253.5, AdjClose=253.5, Volume=11000100),
 Row(Date=datetime.date(2019, 7, 16), Open=249.300003, High=253.529999, Low=247.929993, Close=252.380005, AdjClose=252.380005, Volume=8149000),
 Row(Date=datetime.date(2019, 7, 17), Open=255.669998, High=258.309998, Low=253.350006, Close=254.860001, AdjClose=254.860001, Volume=9764700),
 Row(Date=datetime.date(2019, 7, 18), Open=255.050003, High=255.75, Low=251.889999, Close=253.539993, AdjClose=253.539993, Volume=4764500),
 Row(Date=datetime.date(2019, 7, 19), Open=255.690002, High=259.959991, Low=254.619995, Close=258.179993, AdjClose=258.179993, Volume=7048400),
 Row(Date=datetime.date(2019, 7, 22), Open=258.75, High=262.149994, Low=254.190002, Close=255.679993, AdjClose=255.679993, Volume=6842400),
 Row(Date=datetime.date(2019, 7, 23), Open=256.709991, High=260.480011, Low=254.5, Close=260.170013, AdjClose=260.170013, Volume=5023100),
 Row(Date=datetime.

In [15]:
tesla_df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- AdjClose: double (nullable = true)
 |-- Volume: integer (nullable = true)



In [16]:
!pip install pandas

import pandas




In [17]:
tesla_df.toPandas().head(10)

Unnamed: 0,Date,Open,High,Low,Close,AdjClose,Volume
0,2019-07-15,248.0,254.419998,244.860001,253.5,253.5,11000100
1,2019-07-16,249.300003,253.529999,247.929993,252.380005,252.380005,8149000
2,2019-07-17,255.669998,258.309998,253.350006,254.860001,254.860001,9764700
3,2019-07-18,255.050003,255.75,251.889999,253.539993,253.539993,4764500
4,2019-07-19,255.690002,259.959991,254.619995,258.179993,258.179993,7048400
5,2019-07-22,258.75,262.149994,254.190002,255.679993,255.679993,6842400
6,2019-07-23,256.709991,260.480011,254.5,260.170013,260.170013,5023100
7,2019-07-24,259.170013,266.070007,258.160004,264.880005,264.880005,11072800
8,2019-07-25,233.5,234.5,225.550003,228.820007,228.820007,22418300
9,2019-07-26,226.919998,230.259995,222.25,228.039993,228.039993,10027700


## Loading tesla file and converting it to dataframe

In [18]:
tesla_df.show()

+----------+----------+----------+----------+----------+----------+--------+
|      Date|      Open|      High|       Low|     Close|  AdjClose|  Volume|
+----------+----------+----------+----------+----------+----------+--------+
|2019-07-15|     248.0|254.419998|244.860001|     253.5|     253.5|11000100|
|2019-07-16|249.300003|253.529999|247.929993|252.380005|252.380005| 8149000|
|2019-07-17|255.669998|258.309998|253.350006|254.860001|254.860001| 9764700|
|2019-07-18|255.050003|    255.75|251.889999|253.539993|253.539993| 4764500|
|2019-07-19|255.690002|259.959991|254.619995|258.179993|258.179993| 7048400|
|2019-07-22|    258.75|262.149994|254.190002|255.679993|255.679993| 6842400|
|2019-07-23|256.709991|260.480011|     254.5|260.170013|260.170013| 5023100|
|2019-07-24|259.170013|266.070007|258.160004|264.880005|264.880005|11072800|
|2019-07-25|     233.5|     234.5|225.550003|228.820007|228.820007|22418300|
|2019-07-26|226.919998|230.259995|    222.25|228.039993|228.039993|10027700|

## Loading Google file and converting it to dataframe

In [19]:
google_file = 'GOOG.csv'

google_df = sqlContext.read.load(google_file, 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true')

google_df.show()

+----------+-----------+-----------+-----------+-----------+-----------+-------+
|      Date|       Open|       High|        Low|      Close|   AdjClose| Volume|
+----------+-----------+-----------+-----------+-----------+-----------+-------+
|2019-07-15|1146.859985|1150.819946|1139.400024|1150.339966|1150.339966| 903800|
|2019-07-16|     1146.0|1158.579956|     1145.0|1153.579956|1153.579956|1238800|
|2019-07-17|1150.969971|1158.359985| 1145.77002|1146.349976|1146.349976|1170000|
|2019-07-18| 1141.73999| 1147.60498| 1132.72998|1146.329956|1146.329956|1291300|
|2019-07-19|1148.189941|1151.140015|1129.619995|1130.099976|1130.099976|1647200|
|2019-07-22|1133.449951|    1139.25| 1124.23999|1138.069946|1138.069946|1301500|
|2019-07-23|     1144.0|1146.900024|1131.800049|1146.209961|1146.209961|1093700|
|2019-07-24|1131.900024|     1144.0| 1126.98999|1137.810059|1137.810059|1589800|
|2019-07-25|1137.819946|1141.699951|1120.920044|1132.119995|1132.119995|2209800|
|2019-07-26|1224.040039|1265

## Loading Amazon file and converting it to dataframe

In [20]:
amazon_file = 'AMZN.csv'

amazon_df = sqlContext.read.load(amazon_file, 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true')

amazon_df.show()

+----------+-----------+-----------+-----------+-----------+-----------+-------+
|      Date|       Open|       High|        Low|      Close|   AdjClose| Volume|
+----------+-----------+-----------+-----------+-----------+-----------+-------+
|2019-07-15|2021.400024|2022.900024|2001.550049| 2020.98999| 2020.98999|2981300|
|2019-07-16|2010.579956|2026.319946|2001.219971|2009.900024|2009.900024|2618200|
|2019-07-17|2007.050049|     2012.0|1992.030029|1992.030029|1992.030029|2558800|
|2019-07-18| 1980.01001|     1987.5|1951.550049|1977.900024|1977.900024|3504300|
|2019-07-19|1991.209961|     1996.0| 1962.22998| 1964.52002| 1964.52002|3185600|
|2019-07-22|1971.140015|     1989.0| 1958.26001|1985.630005|1985.630005|2900000|
|2019-07-23| 1995.98999|1997.790039|1973.130005| 1994.48999| 1994.48999|2703500|
|2019-07-24|1969.300049|2001.300049|1965.869995|2000.810059|2000.810059|2631300|
|2019-07-25|     2001.0|2001.199951|1972.719971|1973.819946|1973.819946|4136500|
|2019-07-26|     1942.0|1950

## Average closing price per year for AMZN

In [21]:
import datetime
from pyspark.sql.functions import year, month, dayofmonth

amazon_df.select(year("Date").alias("year"), "AdjClose").groupby("year").avg("AdjClose").sort("year").show()

+----+------------------+
|year|     avg(AdjClose)|
+----+------------------+
|2019|1800.6161329411755|
|2020| 2236.414478798507|
+----+------------------+



## Compute the average closing price per month for amazon stock

In [22]:
amazon_df.select(year("Date").alias("year"),
                month("Date").alias("month"),
                "AdjClose").groupby("year", "month").avg("AdjClose").sort("year", "month").show()

+----+-----+------------------+
|year|month|     avg(AdjClose)|
+----+-----+------------------+
|2019|    7|1964.6846265384618|
|2019|    8|1793.6027220909093|
|2019|    9|     1799.12099615|
|2019|   10|1752.3317498695653|
|2019|   11|      1774.2939941|
|2019|   12|1785.7728446190476|
|2020|    1|1884.2376128571425|
|2020|    2|2066.1752672631574|
|2020|    3|1872.3104358636365|
|2020|    4|2228.7052408571426|
|2020|    5|2394.1840209499996|
|2020|    6|      2613.5454545|
|2020|    7| 3053.100016222222|
+----+-----+------------------+



# Register the DataFrames as  temp views

In [23]:
amazon_df.registerTempTable("amazon_stocks")
google_df.registerTempTable("google_stocks")
tesla_df.registerTempTable("tesla_stocks")



# Calculate and display the average closing price per month for amazon stock ordered by year,month 

In [24]:
sqlContext.sql("""SELECT year(amazon_stocks.Date) as yr, \
                month(amazon_stocks.Date) as mo, \
                avg(amazon_stocks.AdjClose) \
                from amazon_stocks \
                group By year(amazon_stocks.Date), month(amazon_stocks.Date)""").show()



+----+---+------------------+
|  yr| mo|     avg(AdjClose)|
+----+---+------------------+
|2019| 10|1752.3317498695653|
|2020|  6|      2613.5454545|
|2020|  3|1872.3104358636365|
|2019|  8|1793.6027220909093|
|2020|  4|2228.7052408571426|
|2020|  1|1884.2376128571425|
|2019|  9|     1799.12099615|
|2019| 12|1785.7728446190476|
|2020|  7| 3053.100016222222|
|2020|  2|2066.1752672631574|
|2019|  7|1964.6846265384618|
|2019| 11|      1774.2939941|
|2020|  5|2394.1840209499996|
+----+---+------------------+



# When did the closing price for google stock go up or down by more than 2 dollars?

In [25]:

sqlContext.sql("SELECT google_stocks.Date, google_stocks.Open, google_stocks.Close, abs(google_stocks.Close - google_stocks.Open) as diff \
                FROM google_stocks \
                WHERE abs(google_stocks.Close - google_stocks.Open) > 4 ").show()



+----------+-----------+-----------+------------------+
|      Date|       Open|      Close|              diff|
+----------+-----------+-----------+------------------+
|2019-07-16|     1146.0|1153.579956| 7.579956000000038|
|2019-07-17|1150.969971|1146.349976| 4.619995000000017|
|2019-07-18| 1141.73999|1146.329956| 4.589966000000004|
|2019-07-19|1148.189941|1130.099976| 18.08996500000012|
|2019-07-22|1133.449951|1138.069946| 4.619995000000017|
|2019-07-24|1131.900024|1137.810059|  5.91003499999988|
|2019-07-25|1137.819946|1132.119995|5.6999510000000555|
|2019-07-26|1224.040039|1250.410034|26.369995000000017|
|2019-07-31|     1223.0|1216.680054| 6.319946000000073|
|2019-08-01|1214.030029| 1209.01001|5.0200190000000475|
|2019-08-02| 1200.73999| 1193.98999|              6.75|
|2019-08-05|1170.040039|1152.319946|17.720092999999906|
|2019-08-06|1163.310059|1169.949951| 6.639892000000145|
|2019-08-07|     1156.0| 1173.98999|17.989990000000034|
|2019-08-08|1182.829956|1204.800049|21.970092999

# What was the max, min closing price for Tesla stock by Year?

In [26]:
sqlContext.sql("SELECT year(tesla_stocks.Date) as yr, max(tesla_stocks.AdjClose), min(tesla_stocks.AdjClose) \
                FROM tesla_stocks \
                group By year(tesla_stocks.Date)").show()


+----+-------------+-------------+
|  yr|max(AdjClose)|min(AdjClose)|
+----+-------------+-------------+
|2019|   430.940002|   211.399994|
|2020|  1544.650024|   361.220001|
+----+-------------+-------------+



## How to check physical plan of Spark SQL 

In [27]:
sqlContext.sql("SELECT year(tesla_stocks.Date) as yr, max(tesla_stocks.AdjClose), min(tesla_stocks.AdjClose) \
                FROM tesla_stocks \
                group By year(tesla_stocks.Date)").explain()



== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[_groupingexpression#404], functions=[max(AdjClose#35), min(AdjClose#35)])
   +- Exchange hashpartitioning(_groupingexpression#404, 200), ENSURE_REQUIREMENTS, [plan_id=395]
      +- HashAggregate(keys=[_groupingexpression#404], functions=[partial_max(AdjClose#35), partial_min(AdjClose#35)])
         +- Project [AdjClose#35, year(Date#30) AS _groupingexpression#404]
            +- FileScan csv [Date#30,AdjClose#35] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/bhupi/PySpark_Tutorial/myenv/Scripts/TSLA.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Date:date,AdjClose:double>




# Join Tesla, Google, Amazon stock closing prices for comparison

In [28]:
joinclose=sqlContext.sql("SELECT tesla_stocks.Date, \
                          tesla_stocks.AdjClose as teslaclose, \
                          amazon_stocks.AdjClose as amazonclose, \
                          google_stocks.AdjClose as googleclose \
                          from tesla_stocks \
                          join google_stocks on tesla_stocks.Date = google_stocks.Date \
                          join amazon_stocks on tesla_stocks.Date = amazon_stocks.Date").cache()

joinclose.show()

joinclose.registerTempTable("joinclose")

+----------+----------+-----------+-----------+
|      Date|teslaclose|amazonclose|googleclose|
+----------+----------+-----------+-----------+
|2019-07-15|     253.5| 2020.98999|1150.339966|
|2019-07-16|252.380005|2009.900024|1153.579956|
|2019-07-17|254.860001|1992.030029|1146.349976|
|2019-07-18|253.539993|1977.900024|1146.329956|
|2019-07-19|258.179993| 1964.52002|1130.099976|
|2019-07-22|255.679993|1985.630005|1138.069946|
|2019-07-23|260.170013| 1994.48999|1146.209961|
|2019-07-24|264.880005|2000.810059|1137.810059|
|2019-07-25|228.820007|1973.819946|1132.119995|
|2019-07-26|228.039993|1943.050049|1250.410034|
|2019-07-29|235.770004|1912.449951|1239.410034|
|2019-07-30|242.259995|1898.530029|1225.140015|
|2019-07-31|241.610001|1866.780029|1216.680054|
|2019-08-01|233.850006|1855.319946| 1209.01001|
|2019-08-02|234.339996| 1823.23999| 1193.98999|
|2019-08-05|228.320007|1765.130005|1152.319946|
|2019-08-06|    230.75|1787.829956|1169.949951|
|2019-08-07|233.419998|1793.400024| 1173

## Join Tesla, Google, Amazon stock closing prices for comparison yearly

In [29]:
sqlContext.sql("SELECT year(joinclose.Date) as yr, \
                avg(joinclose.teslaclose) as teslaclose, \
                avg(joinclose.amazonclose) as amazonclose, \
                avg(joinclose.googleclose) as googleclose \
                from joinclose \
                group By year(joinclose.Date) \
                order by year(joinclose.Date)").show()



+----+------------------+------------------+------------------+
|  yr|        teslaclose|       amazonclose|       googleclose|
+----+------------------+------------------+------------------+
|2019|283.62126070588243|1800.6161329411755|1245.3833654621849|
|2020| 761.8206739179104| 2236.414478798507|1362.8286906865671|
+----+------------------+------------------+------------------+



## Creating dataframe by joining all the tables

In [30]:
joinclose_df = sqlContext.sql("SELECT tesla_stocks.Date, \
                          tesla_stocks.AdjClose as teslaclose, \
                          amazon_stocks.AdjClose as amazonclose, \
                          google_stocks.AdjClose as googleclose \
                          from tesla_stocks \
                          join google_stocks on tesla_stocks.Date = google_stocks.Date \
                          join amazon_stocks on tesla_stocks.Date = amazon_stocks.Date")

In [31]:
joinclose_df.show()

+----------+----------+-----------+-----------+
|      Date|teslaclose|amazonclose|googleclose|
+----------+----------+-----------+-----------+
|2019-07-15|     253.5| 2020.98999|1150.339966|
|2019-07-16|252.380005|2009.900024|1153.579956|
|2019-07-17|254.860001|1992.030029|1146.349976|
|2019-07-18|253.539993|1977.900024|1146.329956|
|2019-07-19|258.179993| 1964.52002|1130.099976|
|2019-07-22|255.679993|1985.630005|1138.069946|
|2019-07-23|260.170013| 1994.48999|1146.209961|
|2019-07-24|264.880005|2000.810059|1137.810059|
|2019-07-25|228.820007|1973.819946|1132.119995|
|2019-07-26|228.039993|1943.050049|1250.410034|
|2019-07-29|235.770004|1912.449951|1239.410034|
|2019-07-30|242.259995|1898.530029|1225.140015|
|2019-07-31|241.610001|1866.780029|1216.680054|
|2019-08-01|233.850006|1855.319946| 1209.01001|
|2019-08-02|234.339996| 1823.23999| 1193.98999|
|2019-08-05|228.320007|1765.130005|1152.319946|
|2019-08-06|    230.75|1787.829956|1169.949951|
|2019-08-07|233.419998|1793.400024| 1173

## Saving dataframe as parquet file

In [32]:
joinclose_df.write.format("parquet").save("joinStock.parquet")

In [33]:
final_df = sqlContext.read.parquet("joinStock.parquet")

In [34]:
final_df.show()

+----------+----------+-----------+-----------+
|      Date|teslaclose|amazonclose|googleclose|
+----------+----------+-----------+-----------+
|2019-07-15|     253.5| 2020.98999|1150.339966|
|2019-07-16|252.380005|2009.900024|1153.579956|
|2019-07-17|254.860001|1992.030029|1146.349976|
|2019-07-18|253.539993|1977.900024|1146.329956|
|2019-07-19|258.179993| 1964.52002|1130.099976|
|2019-07-22|255.679993|1985.630005|1138.069946|
|2019-07-23|260.170013| 1994.48999|1146.209961|
|2019-07-24|264.880005|2000.810059|1137.810059|
|2019-07-25|228.820007|1973.819946|1132.119995|
|2019-07-26|228.039993|1943.050049|1250.410034|
|2019-07-29|235.770004|1912.449951|1239.410034|
|2019-07-30|242.259995|1898.530029|1225.140015|
|2019-07-31|241.610001|1866.780029|1216.680054|
|2019-08-01|233.850006|1855.319946| 1209.01001|
|2019-08-02|234.339996| 1823.23999| 1193.98999|
|2019-08-05|228.320007|1765.130005|1152.319946|
|2019-08-06|    230.75|1787.829956|1169.949951|
|2019-08-07|233.419998|1793.400024| 1173