In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('operations').getOrCreate()

In [3]:
sdf = spark.read.csv('./Data/stock.csv', inferSchema=True, header=True)

In [4]:
sdf.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [5]:
sdf.show(5)

+----------+----------+----------+------------------+------------------+---------+------------------+
|      Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|
+----------+----------+----------+------------------+------------------+---------+------------------+
|2010-01-04|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
+----------+----------+----------+------------------+------------------+---------+------------------+
only showing top 5 rows



In [6]:
sdf.filter('Close < 220').show(5)

+----------+----------+----------+------------------+------------------+---------+------------------+
|      Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|
+----------+----------+----------+------------------+------------------+---------+------------------+
|2010-01-04|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
+----------+----------+----------+------------------+------------------+---------+------------------+
only showing top 5 rows



In [7]:
sdf.filter('Close < 210').select(['Open', 'High', 'Low', 'Close']).show(5)

+------------------+------------------+----------+----------+
|              Open|              High|       Low|     Close|
+------------------+------------------+----------+----------+
|209.18999499999998|209.76999500000002|206.419998|207.720001|
|210.11000299999998|210.45999700000002|209.020004|    209.43|
|210.92999500000002|211.59999700000003|205.869999|    205.93|
|        212.079994|213.30999599999998|207.210003|208.069996|
|206.78000600000001|        207.499996|    197.16|    197.75|
+------------------+------------------+----------+----------+
only showing top 5 rows



In [8]:
sdf.filter(sdf['Close'] < 210).select(['Open', 'High', 'Low', 'Close']).show(5)

+------------------+------------------+----------+----------+
|              Open|              High|       Low|     Close|
+------------------+------------------+----------+----------+
|209.18999499999998|209.76999500000002|206.419998|207.720001|
|210.11000299999998|210.45999700000002|209.020004|    209.43|
|210.92999500000002|211.59999700000003|205.869999|    205.93|
|        212.079994|213.30999599999998|207.210003|208.069996|
|206.78000600000001|        207.499996|    197.16|    197.75|
+------------------+------------------+----------+----------+
only showing top 5 rows



In [9]:
sdf.filter((sdf['Close'] < 200) & (sdf['Open'] > 200)).select(['Open', 'High', 'Low', 'Close']).show(5)

+------------------+----------+----------+----------+
|              Open|      High|       Low|     Close|
+------------------+----------+----------+----------+
|206.78000600000001|207.499996|    197.16|    197.75|
|        204.930004|205.500004|198.699995|199.289995|
|        201.079996|202.199995|190.250002|192.060003|
+------------------+----------+----------+----------+



In [10]:
sdf.filter(~(sdf['Close'] < 200) & (sdf['Open'] > 200)).select(['Open', 'High', 'Low', 'Close']).show(5)

+----------+----------+------------------+------------------+
|      Open|      High|               Low|             Close|
+----------+----------+------------------+------------------+
|213.429998|214.499996|212.38000099999996|        214.009998|
|214.599998|215.589994|        213.249994|        214.379993|
|214.379993|    215.23|        210.750004|        210.969995|
|    211.75|212.000006|        209.050005|            210.58|
|210.299994|212.000006|209.06000500000002|211.98000499999998|
+----------+----------+------------------+------------------+
only showing top 5 rows



In [11]:
sdf.filter(sdf['Open'] == 211.75).select(['Open', 'High', 'Low', 'Close']).show(5)

+------+----------+----------+------+
|  Open|      High|       Low| Close|
+------+----------+----------+------+
|211.75|212.000006|209.050005|210.58|
+------+----------+----------+------+



In [12]:
open = sdf.filter(sdf['Open'] == 211.75).select(['Open', 'High', 'Low', 'Close']).collect()

In [13]:
open

[Row(Open=211.75, High=212.000006, Low=209.050005, Close=210.58)]

In [14]:
open[0]

Row(Open=211.75, High=212.000006, Low=209.050005, Close=210.58)

In [15]:
open[0].asDict()

{'Open': 211.75, 'High': 212.000006, 'Low': 209.050005, 'Close': 210.58}

In [16]:
# Grab key value from dictionary:
open[0].asDict()['High']

212.000006

In [17]:
sales = spark.read.csv('./Data/sales.csv', inferSchema=True, header=True)

In [18]:
sales.show(5)

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
+-------+-------+-----+
only showing top 5 rows



In [19]:
sales.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [20]:
sales.groupby('Company').mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [21]:
sales.groupby('Company').count().show()

+-------+-----+
|Company|count|
+-------+-----+
|   APPL|    4|
|   GOOG|    3|
|     FB|    2|
|   MSFT|    3|
+-------+-----+



In [22]:
sales.groupby('Company').sum().show()

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+



In [23]:
sales.groupby('Company').min().show()

+-------+----------+
|Company|min(Sales)|
+-------+----------+
|   APPL|     130.0|
|   GOOG|     120.0|
|     FB|     350.0|
|   MSFT|     124.0|
+-------+----------+



In [24]:
sales.groupby('Company').max().show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



In [25]:
sales.groupby('Company').agg({'Sales':'sum', 'Person':'max'}).show()

+-------+----------+-----------+
|Company|sum(Sales)|max(Person)|
+-------+----------+-----------+
|   APPL|    1480.0|       Mike|
|     FB|    1220.0|      Sarah|
|   GOOG|     660.0|        Sam|
|   MSFT|     967.0|    Vanessa|
+-------+----------+-----------+



In [26]:
from pyspark.sql.functions import countDistinct, avg, stddev

In [27]:
sales.select(countDistinct('Sales')).show()

+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                   11|
+---------------------+



In [28]:
sales.select(avg('Sales').alias('Average Sales')).show()

+-----------------+
|    Average Sales|
+-----------------+
|360.5833333333333|
+-----------------+



In [29]:
sales.select(stddev('Sales')).show()

+------------------+
|stddev_samp(Sales)|
+------------------+
|250.08742410799007|
+------------------+



In [30]:
from pyspark.sql.functions import format_number

In [31]:
sales.select(stddev('Sales').alias('STD')).show()

+------------------+
|               STD|
+------------------+
|250.08742410799007|
+------------------+



In [32]:
sales.select(format_number(stddev('Sales'), 2).alias('Standard Deviation')).show()

+------------------+
|Standard Deviation|
+------------------+
|            250.09|
+------------------+



In [33]:
sales.orderBy('Sales').show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



In [34]:
sales.orderBy(sales['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



In [35]:
data = spark.read.csv('./Data/containsNULL.csv', inferSchema=True, header=True)

In [36]:
data.show(5)

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [37]:
data.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



In [38]:
data.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [39]:
data.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [40]:
data.na.drop(how='all').show() # All the row values should have NULL

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [41]:
data.na.drop(subset='Sales').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [42]:
data.na.drop(subset='Name').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp4|Cindy|456.0|
+----+-----+-----+



In [43]:
data.na.fill(value='No Name', subset='Name').show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No Name| null|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [44]:
data.na.fill(value=0, subset='Sales').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [45]:
from pyspark.sql.functions import mean

In [46]:
average = data.select(mean(data['Sales'])).collect()
average

[Row(avg(Sales)=400.5)]

In [47]:
mean_value = average[0][0]
mean_value

400.5

In [48]:
data.na.fill(value=mean_value, subset='Sales').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+

