In [1]:
# import required packages

import pyodbc
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, IntegerType, StructType, StringType,FloatType,DateType
from pyspark.sql.functions import lit,max,mean,min, first,desc,col,format_number

# Declaring Path and Variables

# driver = "ODBC Driver 18 for SQL Server"
driver = "SQL Server"
server = "DESKTOP-TOEPTEF\SQL_SERVER"
port = 1433

# table Name
table_name = "walmart_stock"

# databse_name
db_name='walmart_stock_analysis'

In [2]:
# Let's build a connection string

conn_url = f'DRIVER={driver};Server={server};Port={port}'
conn = pyodbc.connect(conn_url)

In [3]:
curs = conn.cursor()

In [4]:
use_db = f"use {db_name}"
curs.execute(use_db)
curs.commit()

In [5]:
curs.execute(
"""select
* from 
walmart_stock
""")

query_results = curs.fetchall()

In [6]:
# Let's check top 3

query_results[:3]

[('2012-01-03', 59.970001220703125, 61.060001373291016, 59.869998931884766, 60.33000183105469, 12668800, 52.61923599243164),
 ('2012-01-04', 60.209999084472656, 60.349998474121094, 59.470001220703125, 59.709999084472656, 9593300, 52.07847595214844),
 ('2012-01-05', 59.349998474121094, 59.619998931884766, 58.369998931884766, 59.41999816894531, 12768200, 51.825538635253906)]

In [7]:
curs.commit()

**-- PySpark --**

In [8]:
# Now create spark context

sc = pyspark.SparkContext()
sc

In [9]:
walmartrdd = sc.parallelize(query_results)

In [10]:
# Let's see the data.

walmartrdd.take(2)

[('2012-01-03', 59.970001220703125, 61.060001373291016, 59.869998931884766, 60.33000183105469, 12668800, 52.61923599243164),
 ('2012-01-04', 60.209999084472656, 60.349998474121094, 59.470001220703125, 59.709999084472656, 9593300, 52.07847595214844)]

**-- RDD --**

### **Scenario 7: How many days was the Close lower than 60 dollars?**

In [11]:


walmartrdd2 = walmartrdd.map(lambda line:(line[0],int(line[4])))\
.filter(lambda item:float(item[1]) < 60)\
.map(lambda x: (x[0],1))\
.count()


In [12]:
print(f"{walmartrdd2} days was the close lower than 60 dollars.")

81 days was the close lower than 60 dollars.


### Scenario 8: What percentage of the time was the High greater than 80 dollars ?


In [13]:
walmartrdd3 = (walmartrdd.map(lambda line:(line[0],int(line[2])))\
.filter(lambda item:float(item[1]) > 80)\
.map(lambda x: (x[0],1))\
.count())/walmartrdd.count()*100

In [14]:
print(f"{round(walmartrdd3, 2)} % of the time was the high greater than 80 dollars.")


8.43 % of the time was the high greater than 80 dollars.


### Scenario 9: What is the max High per year?

In [15]:
walmartrdd4 = walmartrdd.map(lambda x: (int(x[0].split('-')[0]), x[2]))\
.reduceByKey(lambda a, b: round(a,2) if a>b else round(b,2))

In [16]:
print("Max High Per Year: %s"%walmartrdd4.collect())

Max High Per Year: [(2016, 75.19), (2012, 77.6), (2013, 81.37), (2014, 88.09), (2015, 90.97)]


**-- DSL & SparkSQL --**

In [17]:
ss = SparkSession.builder.appName("project").getOrCreate()

In [18]:
ss

In [19]:
# Let's define schema

data_schema = [StructField('Date',DateType(),True),
               StructField('Open',FloatType(),True),
               StructField('High',FloatType(),True),
               StructField('Low',FloatType(),True),
               StructField('Close',FloatType(),True),
              StructField('Volume',IntegerType(),True),
              StructField('Adj Close',FloatType(),True),]

In [20]:
# Now create a structtype with the schema as field

df_schema = StructType(fields = data_schema)

In [21]:
df = ss.read.csv(r'C:\Users\Futurense\Desktop\Walmart Project DE\Project documents\HIVE WALMART STOCK\walmart_stock.csv', schema = df_schema, header =True)

In [22]:
df.show()

+----------+-----+-----+-----+-----+--------+---------+
|      Date| Open| High|  Low|Close|  Volume|Adj Close|
+----------+-----+-----+-----+-----+--------+---------+
|2012-01-03|59.97|61.06|59.87|60.33|12668800|52.619236|
|2012-01-04|60.21|60.35|59.47|59.71| 9593300|52.078476|
|2012-01-05|59.35|59.62|58.37|59.42|12768200| 51.82554|
|2012-01-06|59.42|59.45|58.87| 59.0| 8069400| 51.45922|
|2012-01-09|59.03|59.55|58.92|59.18| 6679300|51.616215|
|2012-01-10|59.43|59.71|58.98|59.04| 6907300| 51.49411|
|2012-01-11|59.06|59.53|59.04| 59.4| 6365600|51.808098|
|2012-01-12|59.79| 60.0| 59.4| 59.5| 7236400|51.895317|
|2012-01-13|59.18|59.61|59.01|59.54| 7729300|51.930202|
|2012-01-17|59.87|60.11|59.52|59.85| 8500000| 52.20058|
|2012-01-18|59.79|60.03|59.65|60.01| 5911400| 52.34013|
|2012-01-19|59.93|60.73|59.75|60.61| 9234600|52.863445|
|2012-01-20|60.75|61.25|60.67|61.01|10378800|53.212322|
|2012-01-23|60.81|60.98|60.51|60.91| 7134100|53.125103|
|2012-01-24|60.75| 62.0|60.75|61.39| 7362800|53.

In [23]:
# Let's create view for doing spark-sql programm 

df.createOrReplaceTempView("walmartstock")

In [24]:
# Let's do the printschema

df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- Close: float (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: float (nullable = true)



#### Scenario 2:There are too many decimal places for mean and stddev in the describe() dataframe. Format the numbers to just show up to two decimal places. Pay careful attention to the datatypes that .describe() returns, we didn't cover how to do this exact formatting, but we covered something very similar.

In [25]:
df.describe().show()

+-------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|             Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|             1258|             1258|             1258|             1258|             1258|             1258|
|   mean|72.35785375452572| 72.8393880756178|71.91860094964979|72.38844997363553|8222093.481717011|67.23883840200064|
| stddev|6.768090251767697|6.768186825250206|6.744075739203606|6.756859160119612|  4519780.8431556|6.722609385249684|
|    min|            56.39|            57.06|             56.3|            56.42|          2094900|         50.36369|
|    max|             90.8|            90.97|            89.25|            90.47|         80898100|        84.914215|
+-------+-----------------+-----------------+-----------

In [26]:
df.describe().withColumn("Open", format_number(col("Open").cast('float'),2))\
.withColumn("High", format_number(col("High").cast('float'),2))\
.withColumn("Low", format_number(col("Low").cast('float'),2))\
.withColumn("Close", format_number(col("Close").cast('float'),2))\
.withColumn("Volume", format_number(col("Volume").cast('float'),2))\
.withColumn("Adj Close", format_number(col("Adj Close").cast('float'),2))\
.show()

+-------+--------+--------+--------+--------+-------------+---------+
|summary|    Open|    High|     Low|   Close|       Volume|Adj Close|
+-------+--------+--------+--------+--------+-------------+---------+
|  count|1,258.00|1,258.00|1,258.00|1,258.00|     1,258.00| 1,258.00|
|   mean|   72.36|   72.84|   71.92|   72.39| 8,222,093.50|    67.24|
| stddev|    6.77|    6.77|    6.74|    6.76| 4,519,781.00|     6.72|
|    min|   56.39|   57.06|   56.30|   56.42| 2,094,900.00|    50.36|
|    max|   90.80|   90.97|   89.25|   90.47|80,898,096.00|    84.91|
+-------+--------+--------+--------+--------+-------------+---------+



#### Scenario 3: Create a new dataframe with a column called VH Ratio that is the ratio of the Volume Price VS High of stock traded for a day.?

In [51]:
# Spark - SQL Technique

ss.sql("select *, round(cast(Volume as float)/cast(High as float),2) as HV_Ratio from walmartstock").show()

+----------+-----+-----+-----+-----+--------+---------+---------+
|      Date| Open| High|  Low|Close|  Volume|Adj Close| HV_Ratio|
+----------+-----+-----+-----+-----+--------+---------+---------+
|2012-01-03|59.97|61.06|59.87|60.33|12668800|52.619236|207481.16|
|2012-01-04|60.21|60.35|59.47|59.71| 9593300|52.078476|158961.06|
|2012-01-05|59.35|59.62|58.37|59.42|12768200| 51.82554|214159.68|
|2012-01-06|59.42|59.45|58.87| 59.0| 8069400| 51.45922|135734.23|
|2012-01-09|59.03|59.55|58.92|59.18| 6679300|51.616215|112162.89|
|2012-01-10|59.43|59.71|58.98|59.04| 6907300| 51.49411|115680.79|
|2012-01-11|59.06|59.53|59.04| 59.4| 6365600|51.808098|106930.96|
|2012-01-12|59.79| 60.0| 59.4| 59.5| 7236400|51.895317|120606.67|
|2012-01-13|59.18|59.61|59.01|59.54| 7729300|51.930202|129664.48|
|2012-01-17|59.87|60.11|59.52|59.85| 8500000| 52.20058|141407.42|
|2012-01-18|59.79|60.03|59.65|60.01| 5911400| 52.34013|  98474.1|
|2012-01-19|59.93|60.73|59.75|60.61| 9234600|52.863445|152059.94|
|2012-01-2

In [48]:
# DSL Technique

df.withColumn('HV Ratio',round(lit(col('Volume').cast('float')/col('High').cast('float')),2)).show()

+----------+-----+-----+-----+-----+--------+---------+---------+
|      Date| Open| High|  Low|Close|  Volume|Adj Close| HV Ratio|
+----------+-----+-----+-----+-----+--------+---------+---------+
|2012-01-03|59.97|61.06|59.87|60.33|12668800|52.619236|207481.16|
|2012-01-04|60.21|60.35|59.47|59.71| 9593300|52.078476|158961.06|
|2012-01-05|59.35|59.62|58.37|59.42|12768200| 51.82554|214159.68|
|2012-01-06|59.42|59.45|58.87| 59.0| 8069400| 51.45922|135734.23|
|2012-01-09|59.03|59.55|58.92|59.18| 6679300|51.616215|112162.89|
|2012-01-10|59.43|59.71|58.98|59.04| 6907300| 51.49411|115680.79|
|2012-01-11|59.06|59.53|59.04| 59.4| 6365600|51.808098|106930.96|
|2012-01-12|59.79| 60.0| 59.4| 59.5| 7236400|51.895317|120606.67|
|2012-01-13|59.18|59.61|59.01|59.54| 7729300|51.930202|129664.48|
|2012-01-17|59.87|60.11|59.52|59.85| 8500000| 52.20058|141407.42|
|2012-01-18|59.79|60.03|59.65|60.01| 5911400| 52.34013|  98474.1|
|2012-01-19|59.93|60.73|59.75|60.61| 9234600|52.863445|152059.94|
|2012-01-2

#### Scenario 4: What day had the Peak High in Price?

In [28]:
# Spark-SQL techinique

ss.sql("select Date, High from walmartstock order by High desc limit 1").show()



+----------+-----+
|      Date| High|
+----------+-----+
|2015-01-13|90.97|
+----------+-----+



In [29]:
# DSL

df.orderBy(desc('High')).select("Date", "High").limit(1).show()

+----------+-----+
|      Date| High|
+----------+-----+
|2015-01-13|90.97|
+----------+-----+



#### Scenario 5: What is the mean of the Close column?

In [30]:
from pyspark.sql.functions import round

In [31]:
# Spark - SQL Technique

ss.sql("select round(avg(Close),2) as mean_close from walmartstock").show()

+----------+
|mean_close|
+----------+
|     72.39|
+----------+



In [32]:
# DSL Technique

df.select(round(mean(df.Close),2).alias("Mean_Close")).show()

+----------+
|Mean_Close|
+----------+
|     72.39|
+----------+



#### Scenario 6: What is the max and min of the Volume column?

In [33]:
# Spark - Sql Technique

ss.sql("select round(max(Volume)) as max_volume, round(min(Volume)) as min_volume from walmartstock").show()

+----------+----------+
|max_volume|min_volume|
+----------+----------+
|  80898100|   2094900|
+----------+----------+



In [34]:
# DSL Technique

df.select(round(max(df.Volume),2).alias("Max_Volume"), round(min(df.Volume),2).alias("Min_Volume")).show()

+----------+----------+
|Max_Volume|Min_Volume|
+----------+----------+
|  80898100|   2094900|
+----------+----------+

