In [37]:
def fibonacci_generator():
    a, b = 0, 1
    while True:
        print(a, b)
        yield a
        a, b = b, a + b

# Example usage:
fib_gen = fibonacci_generator()
for _ in range(10):
    print(next(fib_gen))


0 1
0
1 1
1
1 2
1
2 3
2
3 5
3
5 8
5
8 13
8
13 21
13
21 34
21
34 55
34


In [85]:
!pip install pyodbc



In [11]:
import findspark
findspark.init('C:\spark-3.3.2-bin-hadoop3')

In [12]:
import pyodbc
import pandas as pd
import pyspark
import datetime as dt
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import round, col, lit, array


In [13]:
conn = pyodbc.connect(
    "Driver={SQl Server};"
    "Server=DESKTOP-SC1ES0U\SQL_SERVER;"
    "Database=stock;"
    "Trusted_Connection=yes;"
)


In [14]:
curs = conn.cursor()

In [15]:
# See all the tables

tables = "select name from sys.tables;"
curs.execute(tables)
query_results = curs.fetchall()
print(query_results)

[('walmart_stock', )]


In [16]:
# Print 3 tables from walmart_stock table

curs.execute("select top 3 * from walmart_stock")

query_results = curs.fetchall()
query_results

[('2012-01-03', 59.970001220703125, 61.060001373291016, 59.869998931884766, 60.33000183105469, 12668800, 52.61923599243164),
 ('2012-01-04', 60.209999084472656, 60.349998474121094, 59.470001220703125, 59.709999084472656, 9593300, 52.07847595214844),
 ('2012-01-05', 59.349998474121094, 59.619998931884766, 58.369998931884766, 59.41999816894531, 12768200, 51.825538635253906)]

In [17]:
# Get the Description of table
curs = conn.cursor()

curs.execute("EXEC sp_columns 'walmart_stock'")
query_results = curs.fetchall()
column_names = [desc[0] for desc in curs.description]

df = pd.DataFrame([list(row) for row in query_results], columns=column_names)

df[["COLUMN_NAME","TYPE_NAME"]]

Unnamed: 0,COLUMN_NAME,TYPE_NAME
0,Date,date
1,Open,float
2,High,float
3,Low,float
4,Close,float
5,Volume,int
6,Adj_Close,float


In [18]:
# Get all data, and print the head

curs = conn.cursor()

curs.execute("select * from walmart_stock")
query_results = curs.fetchall()
column_names = [desc[0] for desc in curs.description]

df = pd.DataFrame([list(row) for row in query_results], columns=column_names)

df.head()

Unnamed: 0,Date,Open,High,Low,Close,Volume,Adj_Close
0,2012-01-03,59.970001,61.060001,59.869999,60.330002,12668800,52.619236
1,2012-01-04,60.209999,60.349998,59.470001,59.709999,9593300,52.078476
2,2012-01-05,59.349998,59.619999,58.369999,59.419998,12768200,51.825539
3,2012-01-06,59.419998,59.450001,58.869999,59.0,8069400,51.459221
4,2012-01-09,59.029999,59.549999,58.919998,59.18,6679300,51.616215


In [19]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1258 entries, 0 to 1257
Data columns (total 7 columns):
 #   Column     Non-Null Count  Dtype  
---  ------     --------------  -----  
 0   Date       1258 non-null   object 
 1   Open       1258 non-null   float64
 2   High       1258 non-null   float64
 3   Low        1258 non-null   float64
 4   Close      1258 non-null   float64
 5   Volume     1258 non-null   int64  
 6   Adj_Close  1258 non-null   float64
dtypes: float64(5), int64(1), object(1)
memory usage: 68.9+ KB


In [20]:
df=df.astype({'Date':'datetime64'})
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1258 entries, 0 to 1257
Data columns (total 7 columns):
 #   Column     Non-Null Count  Dtype         
---  ------     --------------  -----         
 0   Date       1258 non-null   datetime64[ns]
 1   Open       1258 non-null   float64       
 2   High       1258 non-null   float64       
 3   Low        1258 non-null   float64       
 4   Close      1258 non-null   float64       
 5   Volume     1258 non-null   int64         
 6   Adj_Close  1258 non-null   float64       
dtypes: datetime64[ns](1), float64(5), int64(1)
memory usage: 68.9 KB


In [21]:
# Create a SparkContext

sc = pyspark.SparkContext()
sc

In [22]:
# Create a SparkSession
ss = SparkSession.builder.getOrCreate()


In [23]:
# Define the schema for the Spark DataFrame

schema = StructType([
    StructField("Date", DateType()),
    StructField("Open", DoubleType()),
    StructField("High", DoubleType()),
    StructField("Low", DoubleType()),
    StructField("Close", DoubleType()),
    StructField("Volume", IntegerType()),
    StructField("Adj_Close", DoubleType())
])

In [24]:
sdf = ss.createDataFrame(df, schema)

  for column, series in pdf.iteritems():


In [25]:
type(sdf)

pyspark.sql.dataframe.DataFrame

In [26]:
sdf.head()

Row(Date=datetime.date(2012, 1, 3), Open=59.970001220703125, High=61.060001373291016, Low=59.869998931884766, Close=60.33000183105469, Volume=12668800, Adj_Close=52.61923599243164)

In [27]:
sdf.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj_Close: double (nullable = true)



In [28]:
#Convert Spark DataFrame to RDD

rdd = sdf.rdd

In [29]:
type(rdd)

pyspark.rdd.RDD

In [30]:
rdd.take(3)

[Row(Date=datetime.date(2012, 1, 3), Open=59.970001220703125, High=61.060001373291016, Low=59.869998931884766, Close=60.33000183105469, Volume=12668800, Adj_Close=52.61923599243164),
 Row(Date=datetime.date(2012, 1, 4), Open=60.209999084472656, High=60.349998474121094, Low=59.470001220703125, Close=59.709999084472656, Volume=9593300, Adj_Close=52.07847595214844),
 Row(Date=datetime.date(2012, 1, 5), Open=59.349998474121094, High=59.619998931884766, Low=58.369998931884766, Close=59.41999816894531, Volume=12768200, Adj_Close=51.825538635253906)]

## scenario 1:  
### print out first 5 columns ?

## scenario2:
### Format all values in the "describe()" dataframe to show up to two decimal places.


In [31]:
x = sdf.describe()
rounded_cols = [round(col(col_name), 2).alias(col_name) if col_name != "summary" else col(col_name) for col_name in x.columns]
y = x.select(*rounded_cols)
print("Rounded Description:")
y.show()

Rounded Description:
+-------+------+------+------+------+----------+---------+
|summary|  Open|  High|   Low| Close|    Volume|Adj_Close|
+-------+------+------+------+------+----------+---------+
|  count|1258.0|1258.0|1258.0|1258.0|    1258.0|   1258.0|
|   mean| 72.36| 72.84| 71.92| 72.39|8222093.48|    67.24|
| stddev|  6.77|  6.77|  6.74|  6.76|4519780.84|     6.72|
|    min| 56.39| 57.06|  56.3| 56.42| 2094900.0|    50.36|
|    max|  90.8| 90.97| 89.25| 90.47| 8.08981E7|    84.91|
+-------+------+------+------+------+----------+---------+



## scenario3:

### Create a new dataframe with a column called "HV Ratio," which represents the ratio of the High Price versus the volume of stock traded for a day.


In [32]:
print("Starting 2 rows with HV:")
sdf.withColumn("HV", sdf.High/sdf.Volume).select('Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'HV').show(2)

Starting 2 rows with HV:
+----------+------------------+------------------+------------------+------------------+--------+--------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|                  HV|
+----------+------------------+------------------+------------------+------------------+--------+--------------------+
|2012-01-03|59.970001220703125|61.060001373291016|59.869998931884766| 60.33000183105469|12668800|4.819714682786927E-6|
|2012-01-04|60.209999084472656|60.349998474121094|59.470001220703125|59.709999084472656| 9593300|6.290848662516662E-6|
+----------+------------------+------------------+------------------+------------------+--------+--------------------+
only showing top 2 rows



In [155]:
61.060001373291016/12668800

4.819714682786927e-06

## scenario4: 
### On which day did the stock reach its peak high price?

In [33]:
rdd1 = rdd.max(lambda row: row['High'])

print(f"The Peak High Price was {rdd1['High']} and the date was {rdd1['Date']}.")

The Peak High Price was 90.97000122070312 and the date was 2015-01-13.


## scenario5:

### What is the mean value of the "Close" column?


In [34]:
rdd2 = rdd.map(lambda row: row['Close']).mean()
print("The mean of Close column is:", rdd2)

The mean of Close column is: 72.38844997363552


## scenario6:

### What are the maximum and minimum values of the "Volume" column?¶


In [49]:
rdd3 = rdd.max(lambda row: row['Volume'])
rdd4 = rdd.min(lambda row: row['Volume'])

print("The max of Volume is:", rdd3['Volume'])
print("The min of Volume is:", rdd4['Volume'])

The max of Volume is: 80898100
The min of Volume is: 2094900


## scenario7:

### How many days did the "Close" column have a value lower than $60?


In [63]:
rdd5 = rdd.filter(lambda row: row['Close']<60).count()
print(rdd5, "Days are have clossing value is less then 60.")

81 Days are have clossing value is less then 60.


## Scenario8:

### What percentage of the time did the "High" column have a value greater than $80?

In [68]:
rdd6 = rdd.filter(lambda row: row['High']>80).count()/rdd.count()

print(round(rdd6,2)*100, "% of time the High value is greater then 80 Dollars.")

9.0 % of time the High value is greater then 80 Dollars.


## Scenario9:

### What is the maximum value of the "High" column for each year in the dataset?

In [85]:
rdd7 = rdd.map(lambda row: (row['Date'].year,row['High'])).reduceByKey(max)
print("Max High of each Year is:")
rdd7.collect()

Max High of each Year is:


[(2016, 75.19000244140625),
 (2012, 77.5999984741211),
 (2013, 81.37000274658203),
 (2014, 88.08999633789062),
 (2015, 90.97000122070312)]