In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')

In [2]:
import pyspark

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName('Basic Operations').getOrCreate()

In [13]:
df = spark.read.csv("/home/ubuntu/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/appl_stock.csv", inferSchema=True, header=True)
# df.filter('Close  < 500').select(['Open', 'Close']).show()
# df.filter(df['Close'] < 500).select('Volume').show()
df.printSchema()
df.show()
result = df.filter((df['Close'] < 200) & ~(df['Open'] > 200)).collect()
row = result[0]
row.asDict()['Volume']

#  Spark DataFrames - GroupBy and Aggregate Functions

In [32]:
df = spark.read.csv("/home/ubuntu/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/sales_info.csv", inferSchema=True, header=True)

In [41]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [38]:
# df.groupBy("Company").mean().show()
# df.groupBy("Company").min().show()
# df.groupBy("Company").max().show()
# df.groupBy("Company").count().show()

In [46]:
df.agg({'Sales':'max'}).show()
grouped_data = df.groupBy('Company')
grouped_data.agg({'Sales':'max'}).show()

+----------+
|max(Sales)|
+----------+
|     870.0|
+----------+

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



In [47]:
from pyspark.sql.functions import countDistinct,avg,stddev

In [52]:
df.select(countDistinct('Company')).show()
df.select(avg('Sales').alias('Average Sales')).show()

+-----------------------+
|count(DISTINCT Company)|
+-----------------------+
|                      4|
+-----------------------+

+-----------------+
|    Average Sales|
+-----------------+
|360.5833333333333|
+-----------------+



In [53]:
from pyspark.sql.functions import format_number

In [57]:
sales_std = df.select(stddev('Sales').alias('std'))

In [61]:
sales_std.select(format_number('std',2)).show()

+---------------------+
|format_number(std, 2)|
+---------------------+
|               250.09|
+---------------------+



# Spark Dataframe - Missing Data 

In [63]:
df = spark.read.csv("/home/ubuntu/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/ContainsNull.csv", inferSchema=True, header=True)

In [64]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [68]:
# df.na.drop(thresh=2).show()
df.na.drop(how='all').show()
df.na.drop(subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [70]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



In [73]:
df.na.fill("Fill Value").show()
df.na.fill(0).show()

+----+----------+-----+
|  Id|      Name|Sales|
+----+----------+-----+
|emp1|      John| null|
|emp2|Fill Value| null|
|emp3|Fill Value|345.0|
|emp4|     Cindy|456.0|
+----+----------+-----+

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [77]:
df.na.fill("No Name", subset=['Name']).show()
from pyspark.sql.functions import mean
mean_sales = df.select(mean('Sales')).collect()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No Name| null|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [78]:
mean_val = mean_sales[0][0]

In [80]:
df.na.fill(df.select(mean('Sales')).collect()[0][0], ['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



# Spark Dataframes - Handling Dates

In [5]:
df = spark.read.csv("/home/ubuntu/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/appl_stock.csv", inferSchema=True, header=True)

In [6]:
df.head(1)

[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)]

In [8]:
df.select(['Date', 'Open']).show()

+--------------------+------------------+
|                Date|              Open|
+--------------------+------------------+
|2010-01-04 00:00:...|        213.429998|
|2010-01-05 00:00:...|        214.599998|
|2010-01-06 00:00:...|        214.379993|
|2010-01-07 00:00:...|            211.75|
|2010-01-08 00:00:...|        210.299994|
|2010-01-11 00:00:...|212.79999700000002|
|2010-01-12 00:00:...|209.18999499999998|
|2010-01-13 00:00:...|        207.870005|
|2010-01-14 00:00:...|210.11000299999998|
|2010-01-15 00:00:...|210.92999500000002|
|2010-01-19 00:00:...|        208.330002|
|2010-01-20 00:00:...|        214.910006|
|2010-01-21 00:00:...|        212.079994|
|2010-01-22 00:00:...|206.78000600000001|
|2010-01-25 00:00:...|202.51000200000001|
|2010-01-26 00:00:...|205.95000100000001|
|2010-01-27 00:00:...|        206.849995|
|2010-01-28 00:00:...|        204.930004|
|2010-01-29 00:00:...|        201.079996|
|2010-02-01 00:00:...|192.36999699999998|
+--------------------+------------

In [9]:
from pyspark.sql.functions import (dayofmonth, hour, dayofyear, month, year, weekofyear, format_number, date_format)

In [14]:
# df.select(dayofmonth(df['Date'])).show()
# df.select(month(df['Date'])).show()
df.select(year(df['Date'])).show()

+----------+
|year(Date)|
+----------+
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
+----------+
only showing top 20 rows



In [17]:
new_df = df.withColumn("Year", year(df['Date']))

In [23]:
result = new_df.groupBy("Year").mean().select(["Year", "avg(Close)"])

In [27]:
new = result.withColumnRenamed("avg(Close)", "Average Closing Price")

In [28]:
new.select(["Year", format_number("Average Closing Price", 2)]).show()

+----+---------------------------------------+
|Year|format_number(Average Closing Price, 2)|
+----+---------------------------------------+
|2015|                                 120.04|
|2013|                                 472.63|
|2014|                                 295.40|
|2012|                                 576.05|
|2016|                                 104.60|
|2010|                                 259.84|
|2011|                                 364.00|
+----+---------------------------------------+

