In [1]:
import os
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('filter_ops').getOrCreate()

In [44]:
df = spark.read.csv('data/appl_stock.csv',inferSchema=True,header = True)
print('DataFrame shape : ',(df.count(),len(df.columns)))
df.printSchema()

DataFrame shape :  (1762, 7)
root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [6]:
df.describe().show()

+-------+----------+------------------+------------------+------------------+-----------------+-------------------+------------------+
|summary|      Date|              Open|              High|               Low|            Close|             Volume|         Adj Close|
+-------+----------+------------------+------------------+------------------+-----------------+-------------------+------------------+
|  count|      1762|              1762|              1762|              1762|             1762|               1762|              1762|
|   mean|      null| 313.0763111589103| 315.9112880164581| 309.8282405079457|312.9270656379113|9.422577587968218E7| 75.00174115607275|
| stddev|      null|185.29946803981522|186.89817686485767|183.38391664371008|185.1471036170943|6.020518776592709E7| 28.57492972179906|
|    min|2010-01-04|              90.0|         90.699997|         89.470001|        90.279999|           11475900|         24.881912|
|    max|2016-12-30|        702.409988|        705.0700

In [20]:
############## Filtering ##################

# 1) SQL Syntax
# print(df.filter('Close < 500').show())
# print(df.filter('Close < 500 AND High > 200').show())
# print(df.filter('High = 215.23').show())

# 2) Python Syntax
# print(df.filter(df['Close'] < 500).show())
# print(df.filter( (df['Close'] < 500) & (df['High'] > 200) ).show())
print(df.filter(df['High'] == 215.23).show())

+----------+----------+------+----------+----------+---------+------------------+
|      Date|      Open|  High|       Low|     Close|   Volume|         Adj Close|
+----------+----------+------+----------+----------+---------+------------------+
|2010-01-06|214.379993|215.23|210.750004|210.969995|138040000|27.333178000000004|
+----------+----------+------+----------+----------+---------+------------------+

None


In [21]:
############## Select Specific Columns
df.filter(df['High'] == 215.23).select(['High','Low']).show()

+------+----------+
|  High|       Low|
+------+----------+
|215.23|210.750004|
+------+----------+



In [23]:
############# Getting row as dictionary
res = df.filter(df['High'] == 215.23).collect()
print(res)
res[0].asDict()

[Row(Date='2010-01-06', Open=214.379993, High=215.23, Low=210.750004, Close=210.969995, Volume=138040000, Adj Close=27.333178000000004)]


{'Date': '2010-01-06',
 'Open': 214.379993,
 'High': 215.23,
 'Low': 210.750004,
 'Close': 210.969995,
 'Volume': 138040000,
 'Adj Close': 27.333178000000004}

In [63]:
########## Computing average prices per year ############

from pyspark.sql.functions import year,mean,format_number

cols = ['Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close'] 
df_new = df.withColumn('Year',year(df['Date']))
group = df_new.groupBy('Year').mean().orderBy('Year')
for col in cols:
    col_name = 'avg({})'.format(col) 
    group.select(['Year',format_number(col_name,2).alias('Average {}'.format(col))]).show()


+----+------------+
|Year|Average Open|
+----+------------+
|2010|      259.96|
|2011|      364.06|
|2012|      576.65|
|2013|      473.13|
|2014|      295.14|
|2015|      120.18|
|2016|      104.51|
+----+------------+

+----+------------+
|Year|Average High|
+----+------------+
|2010|      262.37|
|2011|      367.42|
|2012|      581.83|
|2013|      477.64|
|2014|      297.56|
|2015|      121.24|
|2016|      105.43|
+----+------------+

+----+-----------+
|Year|Average Low|
+----+-----------+
|2010|     256.85|
|2011|     360.30|
|2012|     569.92|
|2013|     468.25|
|2014|     292.99|
|2015|     118.86|
|2016|     103.69|
+----+-----------+

+----+-------------+
|Year|Average Close|
+----+-------------+
|2010|       259.84|
|2011|       364.00|
|2012|       576.05|
|2013|       472.63|
|2014|       295.40|
|2015|       120.04|
|2016|       104.60|
+----+-------------+

+----+--------------+
|Year|Average Volume|
+----+--------------+
|2010|149,826,316.67|
|2011|123,074,741.67|
|2012|

### GROUP_BY AND AGG methods

In [25]:
import os
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('group_ops').getOrCreate()

In [26]:
df = spark.read.csv('data/sales_info.csv',inferSchema=True,header=True)
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [27]:
group_company = df.groupBy('Company')
print(group_company)

<pyspark.sql.group.GroupedData object at 0x7f7855485710>


In [29]:
group_company.agg({'Person':'min','Sales':'max'}).show()

+-------+----------+-----------+
|Company|max(Sales)|min(Person)|
+-------+----------+-----------+
|   APPL|     750.0|      Chris|
|   GOOG|     340.0|    Charlie|
|     FB|     870.0|       Carl|
|   MSFT|     600.0|        Amy|
+-------+----------+-----------+



In [31]:
# df.agg({'Sales':'mean'}).show()
df.agg({
    'Person':'min',
    'Sales':'max'
}).show()

+----------+-----------+
|max(Sales)|min(Person)|
+----------+-----------+
|     870.0|      Chris|
+----------+-----------+



In [35]:
############### Functions ##############
from pyspark.sql.functions import countDistinct,format_number,stddev

df.select(format_number(stddev('Sales'),2).alias('std')).show()

+------+
|   std|
+------+
|250.09|
+------+



In [38]:
################ Sorting ###################
# df.orderBy('Sales').show() # : ascending ordering
df.orderBy(df['Sales'].desc()).show() # descending ordering

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+

