
# **Running Pyspark in Colab**

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 2.4.5 with hadoop 2.7, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab. 
Follow the steps to install the dependencies:

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

Now that you installed Spark and Java in Colab, it is time to set the environment path which enables you to run Pyspark in your Colab environment. Set the location of Java and Spark by running the following code:

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

Run a local spark session to test your installation:

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [4]:
import pyspark
print(pyspark.__version__)

2.4.5


In [5]:
!ls

sample_data  spark-2.4.5-bin-hadoop2.7	spark-2.4.5-bin-hadoop2.7.tgz


In [7]:
from google.colab import files
file = files.upload()

Saving walmart_stock.csv to walmart_stock.csv


In [8]:
!ls

sample_data		   spark-2.4.5-bin-hadoop2.7.tgz
spark-2.4.5-bin-hadoop2.7  walmart_stock.csv


#### Use the walmart_stock.csv file to Answer and complete the  tasks below!

Start a simple Spark Session

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('walmart').getOrCreate()

In [10]:
spark

Load the Walmart Stock CSV File, have Spark infer the data types.

In [11]:
df = spark.read.csv('walmart_stock.csv', inferSchema=True, header=True)
type(df)

pyspark.sql.dataframe.DataFrame

In [12]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [13]:
df.show()

+-------------------+------------------+------------------+------------------+------------------+--------+------------------+
|               Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+--------+------------------+
|2012-01-03 00:00:00|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:00|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|
|2012-01-06 00:00:00|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|
|2012-01-09 00:00:00|         59.029999|         59.549999|         58.919998|             59.18| 6679300|51.616215000

In [14]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [15]:
df.head(5)

[Row(Date=datetime.datetime(2012, 1, 3, 0, 0), Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996),
 Row(Date=datetime.datetime(2012, 1, 4, 0, 0), Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475),
 Row(Date=datetime.datetime(2012, 1, 5, 0, 0), Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539),
 Row(Date=datetime.datetime(2012, 1, 6, 0, 0), Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922),
 Row(Date=datetime.datetime(2012, 1, 9, 0, 0), Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj Close=51.616215000000004)]

Use describe() to learn about the DataFrame.

In [16]:
df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

There are too many decimal places for mean and stddev in the describe() dataframe. Format the numbers to just show up to two decimal places.

In [17]:
from pyspark.sql.functions import format_number
df.describe().printSchema()

root
 |-- summary: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



In [18]:
result = df.describe()
result.select(result['summary'],
              format_number(result['Open'].cast('float'), 2).alias('Open'),
              format_number(result['High'].cast('float'), 2).alias('High'),
              format_number(result['Low'].cast('float'), 2).alias('Low'),
              format_number(result['Close'].cast('float'), 2).alias('Close'),
              format_number(result['Volume'].cast('float'), 2).alias('Volume'),
              format_number(result['Adj Close'].cast('float'), 2).alias('Adj Close')).show()

+-------+--------+--------+--------+--------+-------------+---------+
|summary|    Open|    High|     Low|   Close|       Volume|Adj Close|
+-------+--------+--------+--------+--------+-------------+---------+
|  count|1,258.00|1,258.00|1,258.00|1,258.00|     1,258.00| 1,258.00|
|   mean|   72.36|   72.84|   71.92|   72.39| 8,222,093.50|    67.24|
| stddev|    6.77|    6.77|    6.74|    6.76| 4,519,781.00|     6.72|
|    min|   56.39|   57.06|   56.30|   56.42| 2,094,900.00|    50.36|
|    max|   90.80|   90.97|   89.25|   90.47|80,898,096.00|    84.91|
+-------+--------+--------+--------+--------+-------------+---------+



#### Create a new dataframe with a column called HV Ratio that is the ratio of the High Price versus volume of stock traded for a day.

In [19]:
df.withColumn('HV Ratio', df['High'] / df['Volume']).show()

+-------------------+------------------+------------------+------------------+------------------+--------+------------------+--------------------+
|               Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|            HV Ratio|
+-------------------+------------------+------------------+------------------+------------------+--------+------------------+--------------------+
|2012-01-03 00:00:00|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|4.819714653321546E-6|
|2012-01-04 00:00:00|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|6.290848613094555E-6|
|2012-01-05 00:00:00|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|4.669412994783916E-6|
|2012-01-06 00:00:00|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51

What day had the Peak High in Price?

In [50]:
#df.orderBy(df['High'].desc()).head(1)
#[Row(Date=datetime.datetime(2015, 1, 13, 0, 0), Open=90.800003, High=90.970001, Low=88.93, Close=89.309998, Volume=8215400, Adj Close=83.825448)]

#df.orderBy(df['High'].desc()).head(1)[0]
#Row(Date=datetime.datetime(2015, 1, 13, 0, 0), Open=90.800003, High=90.970001, Low=88.93, Close=89.309998, Volume=8215400, Adj Close=83.825448)

df.orderBy(df['High'].desc()).head(1)[0][0]

datetime.datetime(2015, 1, 13, 0, 0)

#### What is the mean of the Close column?

In [57]:
df.agg({'Close':'mean'}).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+



In [59]:
from pyspark.sql.functions import mean
df.select(mean('Close')).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+



In [62]:
df.select(format_number(mean('Close'), 2)).show()

+----------------------------+
|format_number(avg(Close), 2)|
+----------------------------+
|                       72.39|
+----------------------------+



In [63]:
df.select(format_number(mean('Close'), 2).alias('Avg')).show()

+-----+
|  Avg|
+-----+
|72.39|
+-----+



In [74]:
df.describe('Close').show()

+-------+-----------------+
|summary|            Close|
+-------+-----------------+
|  count|             1258|
|   mean|72.38844998012726|
| stddev|6.756859163732991|
|    min|        56.419998|
|    max|        90.470001|
+-------+-----------------+



#### What is the max and min of the Volume column?

In [0]:
from pyspark.sql.functions import max, min


In [78]:
from pyspark.sql.functions import mean
df.select(max('Volume'), min('Volume')).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+



How many days was the Close lower than 60 dollars?