In [1]:
import findspark
findspark.init('/home/harkirat/spark-3.3.1-bin-hadoop3')

In [2]:
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Basics').getOrCreate()

22/12/21 13:47:17 WARN Utils: Your hostname, harkirat-QEMU-Virtual-Machine resolves to a loopback address: 127.0.1.1; using 192.168.64.16 instead (on interface enp0s6)
22/12/21 13:47:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/21 13:47:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df=spark.read.json('Python-and-Spark-for-Big-Data-master/Spark_DataFrames/people.json')

In [4]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [5]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [6]:
df.columns

['age', 'name']

In [7]:
df.describe()

DataFrame[summary: string, age: string, name: string]

In [8]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



In [9]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [10]:
data_schema=[StructField('age', IntegerType(), True),
            StructField('name', StringType(), True)]

In [11]:
final_struct=StructType(fields=data_schema)

In [12]:
df=spark.read.json('Python-and-Spark-for-Big-Data-master/Spark_DataFrames/people.json', schema=final_struct)

In [13]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [14]:
# select a single column

df.select('age').show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [15]:
df.head(2)

[Row(age=None, name='Michael'), Row(age=30, name='Andy')]

In [16]:
df.head(2)[0]

Row(age=None, name='Michael')

In [17]:
# add a new column

df.withColumn('double_age', df['age']*2).show()

+----+-------+----------+
| age|   name|double_age|
+----+-------+----------+
|null|Michael|      null|
|  30|   Andy|        60|
|  19| Justin|        38|
+----+-------+----------+



In [18]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [19]:
# rename a column

df.withColumnRenamed('age', 'my_new_age').show()

+----------+-------+
|my_new_age|   name|
+----------+-------+
|      null|Michael|
|        30|   Andy|
|        19| Justin|
+----------+-------+



In [20]:
# make df into SQL table

df.createOrReplaceTempView('people')
results=spark.sql("select * from people")
results.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [21]:
new_results=spark.sql('select * from people where age=30')
new_results.show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



# Basic Operations

In [22]:
spark=SparkSession.builder.appName('ops')

In [23]:
spark=SparkSession.builder.appName('ops').getOrCreate()

22/12/21 13:47:26 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [24]:
df=spark.read.csv('Python-and-Spark-for-Big-Data-master/Spark_DataFrames/appl_stock.csv', inferSchema=True, header=True)

In [25]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [26]:
df.show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [27]:
df.createOrReplaceTempView('df')

In [28]:
spark.sql('select volume from df where close<200 and open>200').show()

+---------+
|   volume|
+---------+
|220441900|
|293375600|
|311488100|
+---------+



# GroupBy & Aggregate

In [29]:
spark=SparkSession.builder.appName('aggs').getOrCreate()

In [30]:
df=spark.read.csv('Python-and-Spark-for-Big-Data-master/Spark_DataFrames/sales_info.csv', inferSchema=True, header=True)

22/12/21 13:47:27 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [31]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [32]:
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [33]:
df.createOrReplaceTempView('df')

In [34]:
x=spark.sql('select sum(Sales), Company from df group by Company')
x.show()

+----------+-------+
|sum(Sales)|Company|
+----------+-------+
|    1480.0|   APPL|
|     660.0|   GOOG|
|    1220.0|     FB|
|     967.0|   MSFT|
+----------+-------+



In [35]:
x=spark.sql('select count(Sales), Company from df group by Company')
x.show()

+------------+-------+
|count(Sales)|Company|
+------------+-------+
|           4|   APPL|
|           3|   GOOG|
|           2|     FB|
|           3|   MSFT|
+------------+-------+



In [36]:
spark.sql('select sum(sales) from df').show()

+----------+
|sum(sales)|
+----------+
|    4327.0|
+----------+



In [37]:
spark.sql('select count(distinct sales) as distinct_sales from df').show()

+--------------+
|distinct_sales|
+--------------+
|            11|
+--------------+



In [38]:
spark.sql('select round(stddev(sales),2) as std_deviation from df').show()

+-------------+
|std_deviation|
+-------------+
|       250.09|
+-------------+



In [39]:
spark.sql('select * from df order by sales desc').show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



# Missing Data

In [40]:
spark=SparkSession.builder.appName('miss').getOrCreate()

22/12/21 13:47:30 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [41]:
df=spark.read.csv('Python-and-Spark-for-Big-Data-master/Spark_DataFrames/ContainsNull.csv', inferSchema=True, header=True)

In [42]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [43]:
df.createOrReplaceTempView('df')

In [44]:
spark.sql('select * from df where Name is not null and Sales is not null').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [45]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



In [46]:
# filling missing values

df.na.fill('FILL VALUE').show()

+----+----------+-----+
|  Id|      Name|Sales|
+----+----------+-----+
|emp1|      John| null|
|emp2|FILL VALUE| null|
|emp3|FILL VALUE|345.0|
|emp4|     Cindy|456.0|
+----+----------+-----+



In [47]:
df.na.fill(0).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [48]:
df.na.fill('No Name', subset=['Name']).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No Name| null|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [49]:
df.createOrReplaceTempView('df')

In [50]:
x=spark.sql('select mean(sales) from df').collect()
x

[Row(mean(sales)=400.5)]

In [51]:
mean_sales=x[0][0]
mean_sales

400.5

In [52]:
df.na.fill(mean_sales,['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [53]:
# filling missing values using SQL, cannot fill missing numeric values with mean

spark.sql('select ifnull(Name, "No Name") as Name, Sales from df').show()

+-------+-----+
|   Name|Sales|
+-------+-----+
|   John| null|
|No Name| null|
|No Name|345.0|
|  Cindy|456.0|
+-------+-----+



In [54]:
df.na.drop(how='all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [55]:
# show subset of Sales column without null

df.na.drop(subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [56]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



# Dates & Timestamps

In [57]:
spark=SparkSession.builder.appName('dates').getOrCreate()
df=spark.read.csv('Python-and-Spark-for-Big-Data-master/Spark_DataFrames/appl_stock.csv', inferSchema=True, header=True)
df.createOrReplaceTempView('df')

22/12/21 13:47:31 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [58]:
df.select(['Date','Open']).show()

+-------------------+------------------+
|               Date|              Open|
+-------------------+------------------+
|2010-01-04 00:00:00|        213.429998|
|2010-01-05 00:00:00|        214.599998|
|2010-01-06 00:00:00|        214.379993|
|2010-01-07 00:00:00|            211.75|
|2010-01-08 00:00:00|        210.299994|
|2010-01-11 00:00:00|212.79999700000002|
|2010-01-12 00:00:00|209.18999499999998|
|2010-01-13 00:00:00|        207.870005|
|2010-01-14 00:00:00|210.11000299999998|
|2010-01-15 00:00:00|210.92999500000002|
|2010-01-19 00:00:00|        208.330002|
|2010-01-20 00:00:00|        214.910006|
|2010-01-21 00:00:00|        212.079994|
|2010-01-22 00:00:00|206.78000600000001|
|2010-01-25 00:00:00|202.51000200000001|
|2010-01-26 00:00:00|205.95000100000001|
|2010-01-27 00:00:00|        206.849995|
|2010-01-28 00:00:00|        204.930004|
|2010-01-29 00:00:00|        201.079996|
|2010-02-01 00:00:00|192.36999699999998|
+-------------------+------------------+
only showing top

In [59]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [60]:
spark.sql('select dayofmonth(Date) as day_of_month from df').show()

+------------+
|day_of_month|
+------------+
|           4|
|           5|
|           6|
|           7|
|           8|
|          11|
|          12|
|          13|
|          14|
|          15|
|          19|
|          20|
|          21|
|          22|
|          25|
|          26|
|          27|
|          28|
|          29|
|           1|
+------------+
only showing top 20 rows



In [61]:
spark.sql('select hour(Date) as hour from df').show()

+----+
|hour|
+----+
|   0|
|   0|
|   0|
|   0|
|   0|
|   0|
|   0|
|   0|
|   0|
|   0|
|   0|
|   0|
|   0|
|   0|
|   0|
|   0|
|   0|
|   0|
|   0|
|   0|
+----+
only showing top 20 rows



In [62]:
year=spark.sql('select year(Date) as year from df')
year[0]

Column<'year'>

In [63]:
year['year']

Column<'year'>

In [64]:
from pyspark.sql.functions import (dayofmonth, hour,
                                  dayofyear, month,
                                  year, weekofyear,
                                  format_number, date_format)

In [65]:
x=df.select(year(df['Date']))
x

DataFrame[year(Date): int]

In [66]:
x[0]

Column<'year(Date)'>

In [74]:
newdf=df.withColumn('Year', year(df['Date']))
result=newdf.groupBy('Year').mean().select(['Year', 'avg(Close)'])

In [75]:
result.show()

+----+------------------+
|Year|        avg(Close)|
+----+------------------+
|2015|120.03999980555547|
|2013| 472.6348802857143|
|2014| 295.4023416507935|
|2012| 576.0497195640002|
|2016|104.60400786904763|
|2010| 259.8424600000002|
|2011|364.00432532142867|
+----+------------------+



In [99]:
# using SQL

spark.sql('select year(Date) as year, round(avg(Close),2) as avg_closing_price from df group by year').show()

+----+-----------------+
|year|avg_closing_price|
+----+-----------------+
|2015|            72.49|
|2013|            75.32|
|2014|            77.33|
|2012|            67.22|
|2016|            69.55|
+----+-----------------+



# Exercise

In [80]:
spark=SparkSession.builder.appName('exercise').getOrCreate()
df=spark.read.csv('Python-and-Spark-for-Big-Data-master/Spark_DataFrame_Project_Exercise/walmart_stock.csv', inferSchema=True, header=True)

22/12/21 14:17:37 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [81]:
df.createOrReplaceTempView('df')

In [82]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [86]:
df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

In [92]:
spark.sql('select high/volume as HV_ratio from df').show()

+--------------------+
|            HV_ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
|6.307432259386365E-6|
+--------------------+
only showing top 20 rows



In [93]:
spark.sql('select max(volume), min(volume) from df').show()

+-----------+-----------+
|max(volume)|min(volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+



In [94]:
spark.sql('select count(close) from df where close<60').show()

+------------+
|count(close)|
+------------+
|          81|
+------------+



In [100]:
spark.sql('select year(Date) as year, round(max(high),2) as max_high from df group by year').show()

+----+--------+
|year|max_high|
+----+--------+
|2015|   90.97|
|2013|   81.37|
|2014|   88.09|
|2012|    77.6|
|2016|   75.19|
+----+--------+



In [103]:
spark.sql('select month(Date) as month, round(avg(close),2) as avg_closing from df group by month order by month asc').show()

+-----+-----------+
|month|avg_closing|
+-----+-----------+
|    1|      71.45|
|    2|      71.31|
|    3|      71.78|
|    4|      72.97|
|    5|      72.31|
|    6|       72.5|
|    7|      74.44|
|    8|      73.03|
|    9|      72.18|
|   10|      71.58|
|   11|      72.11|
|   12|      72.85|
+-----+-----------+



In [107]:
spark.sql('select date from df order by high limit 1').show()

+-------------------+
|               date|
+-------------------+
|2015-11-13 00:00:00|
+-------------------+

