In [1]:
import findspark

In [2]:
findspark.init('/alvaroRprocessingEBS2/Tools/spark/spark-2.1.1-bin-hadoop2.7')

In [3]:
import pyspark

In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.appName("Basics").getOrCreate()

In [6]:
df = spark.read.json("/alvaroRprocessingEBS2/githubRepos/Spark_and_Python_for_BD_with_PySpark/Course_Notes/Spark_DataFrames/people.json")

In [7]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [8]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [9]:
df.columns

['age', 'name']

In [10]:
# summary of the data.frame
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



Next is for when you want to specify the actual schema of your table.

In [11]:
from pyspark.sql.types import(StructField, StringType,
                                 IntegerType, StructType)

In [12]:
data_schema = [StructField("age", IntegerType(), True),
                  StructField("name", StringType(), True)]

In [13]:
final_struct = StructType(fields = data_schema)

In [14]:
df = spark.read.json("/alvaroRprocessingEBS2/githubRepos/Spark_and_Python_for_BD_with_PySpark/Course_Notes/Spark_DataFrames/people.json",
                    schema = final_struct)

In [15]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [16]:
type(df["age"])

pyspark.sql.column.Column

In [17]:
df.select("age").show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [18]:
df.select(["age", "name"]).show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [19]:
df.withColumn("newAge", df["age"]).show()

+----+-------+------+
| age|   name|newAge|
+----+-------+------+
|null|Michael|  null|
|  30|   Andy|    30|
|  19| Justin|    19|
+----+-------+------+



In [20]:
df.withColumnRenamed("age", "newAge").show()

+------+-------+
|newAge|   name|
+------+-------+
|  null|Michael|
|    30|   Andy|
|    19| Justin|
+------+-------+



In [21]:
# Can run mysql queries.
df.createOrReplaceTempView("people")
results = spark.sql("SELECT * FROM people WHERE age = 30")

In [22]:
type(results)

pyspark.sql.dataframe.DataFrame

In [23]:
results.show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



Spark DataFrame Basic Operations.

In [24]:
# Let's read another DF with apple stock data.
df = spark.read.csv("/alvaroRprocessingEBS2/githubRepos/Spark_and_Python_for_BD_with_PySpark/Course_Notes/Spark_DataFrames/appl_stock.csv", inferSchema=True, header=True)

In [25]:
df.show()

+--------------------+------------------+------------------+------------------+------------------+---------+------------------+
|                Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+--------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:...|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:...|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:...|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:...|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:...|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902

In [26]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [27]:
df.count()

1762

In [28]:
df.filter("Close<500").select(['Open','Close']).show()

+------------------+------------------+
|              Open|             Close|
+------------------+------------------+
|        213.429998|        214.009998|
|        214.599998|        214.379993|
|        214.379993|        210.969995|
|            211.75|            210.58|
|        210.299994|211.98000499999998|
|212.79999700000002|210.11000299999998|
|209.18999499999998|        207.720001|
|        207.870005|        210.650002|
|210.11000299999998|            209.43|
|210.92999500000002|            205.93|
|        208.330002|        215.039995|
|        214.910006|            211.73|
|        212.079994|        208.069996|
|206.78000600000001|            197.75|
|202.51000200000001|        203.070002|
|205.95000100000001|        205.940001|
|        206.849995|        207.880005|
|        204.930004|        199.289995|
|        201.079996|        192.060003|
|192.36999699999998|        194.729998|
+------------------+------------------+
only showing top 20 rows



In [29]:
# Using python comparison operators.
# Make sure to add in the parenthesis separating the statements!
df.filter( (df["Close"] < 200) & (df['Open'] > 200) ).show()

+--------------------+------------------+----------+----------+----------+---------+------------------+
|                Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+--------------------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22 00:00:...|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28 00:00:...|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:...|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+--------------------+------------------+----------+----------+----------+---------+------------------+



In [30]:
# collect result as python object.
result = df.filter(df["Low"] == 197.16).collect()

In [31]:
result

[Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [32]:
type(result[0])

pyspark.sql.types.Row

In [33]:
result[0].asDict()

{'Adj Close': 25.620401,
 'Close': 197.75,
 'Date': datetime.datetime(2010, 1, 22, 0, 0),
 'High': 207.499996,
 'Low': 197.16,
 'Open': 206.78000600000001,
 'Volume': 220441900}

GroupBy and Aggregate.

In [34]:
# we'll work with another dataset.
df = spark.read.csv("/alvaroRprocessingEBS2/githubRepos/Spark_and_Python_for_BD_with_PySpark/Course_Notes/Spark_DataFrames/sales_info.csv", inferSchema=True, header=True)

In [35]:
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [36]:
# get the mean of each company.
df.groupBy("Company").mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [37]:
# Count # rows per company.
df.groupBy("Company").count().show()

+-------+-----+
|Company|count|
+-------+-----+
|   APPL|    4|
|   GOOG|    3|
|     FB|    2|
|   MSFT|    3|
+-------+-----+



Let's now try the exercise set. Check notebook Course_Notes/Spark_DataFrame_Project_Exercise.

In [38]:
# Load the Walmart Stock CSV File, have Spark infer the data types.
df = spark.read.csv("/alvaroRprocessingEBS2/githubRepos/Spark_and_Python_for_BD_with_PySpark/Course_Notes/Spark_DataFrame_Project_Exercise/walmart_stock.csv", inferSchema=True, header=True)

In [39]:
df.show()

+--------------------+------------------+------------------+------------------+------------------+--------+------------------+
|                Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|
+--------------------+------------------+------------------+------------------+------------------+--------+------------------+
|2012-01-03 00:00:...|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:...|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:...|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|
|2012-01-06 00:00:...|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|
|2012-01-09 00:00:...|         59.029999|         59.549999|         58.919998|             59.18| 6679300|51.6

In [40]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [41]:
# print out the top 5 rows.
type(df.head(5))

list

In [42]:
for row in df.head(5):
    print(row)

Row(Date=datetime.datetime(2012, 1, 3, 0, 0), Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996)
Row(Date=datetime.datetime(2012, 1, 4, 0, 0), Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475)
Row(Date=datetime.datetime(2012, 1, 5, 0, 0), Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539)
Row(Date=datetime.datetime(2012, 1, 6, 0, 0), Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922)
Row(Date=datetime.datetime(2012, 1, 9, 0, 0), Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj Close=51.616215000000004)


In [43]:
df.createOrReplaceTempView("walmart")
sql_results = spark.sql("SELECT * FROM walmart LIMIT 5")
sql_results.show()

+--------------------+------------------+---------+---------+------------------+--------+------------------+
|                Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+--------------------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03 00:00:...|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:...|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:...|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06 00:00:...|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09 00:00:...|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+--------------------+------------------+---------+---------+------------------+--------+------------------+



In [44]:
df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

There are too many decimal places for mean and stddev in the describe() dataframe. Format the numbers to just show up to two decimal places. Pay careful attention to the datatypes that .describe() returns, we didn't cover how to do this exact formatting, but we covered something very similar.

In [45]:
df.describe().printSchema()

root
 |-- summary: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



In [46]:
descDf = df.describe()
descDf.select(descDf.Open.cast("double")).printSchema()

root
 |-- Open: double (nullable = true)



In [47]:
from pyspark.sql.functions import format_number
descDf.select(["summary", 
               format_number(descDf.Open.cast("double"), 2).alias("Open2"),
               format_number(descDf.High.cast("double"), 2).alias("High2"),
               format_number(descDf.Low.cast("double"), 2).alias("Low2")]).show()

+-------+--------+--------+--------+
|summary|   Open2|   High2|    Low2|
+-------+--------+--------+--------+
|  count|1,258.00|1,258.00|1,258.00|
|   mean|   72.36|   72.84|   71.92|
| stddev|    6.77|    6.77|    6.74|
|    min|   56.39|   57.06|   56.30|
|    max|   90.80|   90.97|   89.25|
+-------+--------+--------+--------+



Create a new dataframe with a column called HV Ratio that is the ratio of the High Price versus volume of stock traded for a day.

In [48]:
df.withColumn("HVratio", df["High"]/df["Volume"]).select(["Date", "HVratio"]).show()

+--------------------+--------------------+
|                Date|             HVratio|
+--------------------+--------------------+
|2012-01-03 00:00:...|4.819714653321546E-6|
|2012-01-04 00:00:...|6.290848613094555E-6|
|2012-01-05 00:00:...|4.669412994783916E-6|
|2012-01-06 00:00:...|7.367338463826307E-6|
|2012-01-09 00:00:...|8.915604778943901E-6|
|2012-01-10 00:00:...|8.644477436914568E-6|
|2012-01-11 00:00:...|9.351828421515645E-6|
|2012-01-12 00:00:...| 8.29141562102703E-6|
|2012-01-13 00:00:...|7.712212102001476E-6|
|2012-01-17 00:00:...|7.071764823529412E-6|
|2012-01-18 00:00:...|1.015495466386981E-5|
|2012-01-19 00:00:...|6.576354146362592...|
|2012-01-20 00:00:...| 5.90145296180676E-6|
|2012-01-23 00:00:...|8.547679455011844E-6|
|2012-01-24 00:00:...|8.420709512685392E-6|
|2012-01-25 00:00:...|1.041448341728929...|
|2012-01-26 00:00:...|8.316075414862431E-6|
|2012-01-27 00:00:...|9.721183814992126E-6|
|2012-01-30 00:00:...|8.029436027707578E-6|
|2012-01-31 00:00:...|6.30743225

What day had the Peak High in Price?

In [49]:
df.orderBy(df["High"].desc()).select(["Date", "High"]).show()

+--------------------+-----------------+
|                Date|             High|
+--------------------+-----------------+
|2015-01-13 00:00:...|        90.970001|
|2015-01-08 00:00:...|90.66999799999999|
|2015-01-09 00:00:...|        90.389999|
|2015-01-12 00:00:...|        90.309998|
|2015-01-23 00:00:...|        89.260002|
|2015-01-26 00:00:...|        89.160004|
|2015-01-07 00:00:...|            88.68|
|2015-01-14 00:00:...|        88.519997|
|2015-01-27 00:00:...|        88.459999|
|2015-01-22 00:00:...|        88.400002|
|2015-01-28 00:00:...|        88.230003|
|2014-11-28 00:00:...|        88.089996|
|2015-02-06 00:00:...|             88.0|
|2015-01-15 00:00:...|        87.779999|
|2015-01-29 00:00:...|        87.720001|
|2015-01-20 00:00:...|        87.699997|
|2015-01-16 00:00:...|        87.459999|
|2014-12-31 00:00:...|        87.440002|
|2015-02-10 00:00:...|        87.410004|
|2015-01-30 00:00:...|        87.360001|
+--------------------+-----------------+
only showing top

Mean of the Close column?

In [50]:
df.agg({'Close':'mean'}).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+



What is the max and min of the Volume column?

In [51]:
sql_results = spark.sql("SELECT max(Volume), min(Volume) FROM walmart")
sql_results.show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+



How many days was the Close lower than 60 dollars?

In [52]:
df.filter(df["Close"] < 60).count()

81

What percentage of the time was the High greater than 80 dollars? In other words, (Number of Days High>80)/(Total Days in the dataset).

In [53]:
# this is the number of distinct dates:
df.groupBy("Date").count().count()

1258

In [54]:
# this is the number of distinct dates in which High>80:
df.filter(df["High"] > 80).groupBy("Date").count().count()

115

In [55]:
# and here's the percentage:
100 * df.filter(df["High"] > 80).groupBy("Date").count().count() / df.groupBy("Date").count().count()

9.141494435612083

What is the Pearson correlation between High and Volume?

In [56]:
from pyspark.sql.functions import corr
df.select(corr("High", "Volume")).show()

+-------------------+
| corr(High, Volume)|
+-------------------+
|-0.3384326061737161|
+-------------------+



What is the max High per year?

In [57]:
from pyspark.sql.functions import format_number,dayofmonth,hour,dayofyear,month,year,weekofyear,date_format
df.select([year("Date").alias("year"), "High"]).show()

+----+------------------+
|year|              High|
+----+------------------+
|2012|         61.060001|
|2012|         60.349998|
|2012|         59.619999|
|2012|         59.450001|
|2012|         59.549999|
|2012|59.709998999999996|
|2012|         59.529999|
|2012|              60.0|
|2012|59.610001000000004|
|2012|60.110001000000004|
|2012|         60.029999|
|2012|             60.73|
|2012|             61.25|
|2012|             60.98|
|2012|              62.0|
|2012|61.610001000000004|
|2012|             61.84|
|2012|         61.119999|
|2012|             61.32|
|2012|             61.57|
+----+------------------+
only showing top 20 rows



In [58]:
highsPerYear = df.select([year("Date").alias("year"), "High"])
grouped = highsPerYear.groupBy("year")
grouped.agg({"High":'max'}).show()

+----+---------+
|year|max(High)|
+----+---------+
|2015|90.970001|
|2013|81.370003|
|2014|88.089996|
|2012|77.599998|
|2016|75.190002|
+----+---------+



What is the average Close for each Calendar Month? In other words, across all the years, what is the average Close price for Jan, Feb, Mar, etc; your result will have a value for each of these months.

In [59]:
closePerMonth = df.select([month("Date").alias("month"), "Close"])
grouped = closePerMonth.groupBy("month")
grouped.agg({"Close":'mean'}).orderBy("month").show()

+-----+-----------------+
|month|       avg(Close)|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+



Great job Alvaro!