#Project Step 2
###B. Stock Market Data Analysis
In this step you will use Spark RDDs to analyze and summarize the dataset you stored on HDFS in step
1.B of the previous project. Use RDDs and their suitable operations to provided the following summaries.

###Data Overview:
-stock: This column represents the stock symbol (e.g., AAPL for Apple, MSFT for Microsoft).

-datetime: This column represents the date and time when the stock data was recorded.

-volume: The number of shares traded during the given time period. This is a measure of the trading activity for a stock.

-open: The opening price of the stock at the beginning of the time period.

-high: The highest price at which the stock traded during the given time period.

-low: The lowest price at which the stock traded during the given time period.

-close: The closing price of the stock at the end of the time period.


In [1]:
#path to hdfs
path = '/training/flume/stock_csv.csv'

In [2]:
#Load the csv into a rdd
stock_rdd = sc.textFile(path)

In [3]:
#print intial rdd
stock_rdd.take(3)

[u'stock,datetime,volume,open,high,low,close',
 u'AAPL,2024-10-21 12:28:00,484,236.27000,236.27000,236.26000,236.26000',
 u'IBM,2024-10-21 12:25:00,120,231.53000,231.53000,231.53000,231.53000']

In [4]:
header = stock_rdd.first()  # Get the first row (header)
stock_rdd = stock_rdd.filter(lambda row: row != header)  # Remove the header
stock_rddNH = stock_rdd.map(lambda line: line.split(","))  # Split each row by comma


print("First 3 rows after splitting by comma:")
print(stock_rddNH.take(3))

First 3 rows after splitting by comma:
[[u'AAPL', u'2024-10-21 12:28:00', u'484', u'236.27000', u'236.27000', u'236.26000', u'236.26000'], [u'IBM', u'2024-10-21 12:25:00', u'120', u'231.53000', u'231.53000', u'231.53000', u'231.53000'], [u'GOOGL', u'2024-10-21 12:28:00', u'346', u'163.53500', u'163.53500', u'163.50000', u'163.50000']]


In [5]:
# Function to safely convert a string to a float, return None if invalid
def safe_float(value):
    try:
        return float(value)
    except ValueError:
        return None  # Return None if the value cannot be converted to a float

##1) How many records are there in the table?

In [6]:
total_records_rdd = stock_rddNH.count()
print("Total records in the table: {}".format(total_records_rdd))

Total records in the table: 3110


##2) How many different days are there in the table?

In [7]:
# 2. How many different days are there in the table?
# Extract the date part from 'datetime' (assumes datetime format is 'YYYY-MM-DD HH:MM:SS')
distinct_days_rdd = stock_rddNH.map(lambda row: row[1].split(" ")[0]).distinct()
print("Number of different days: {}".format(distinct_days_rdd.count()))

Number of different days: 5


##3. How many records per each day are there in the table?

In [8]:
# 3. How many records per each day are there in the table?
records_per_day_rdd = stock_rddNH.map(lambda row: (row[1].split(" ")[0], 1)) \
                                 .reduceByKey(lambda a, b: a + b)
print("Count of Records Per Day: {}".format(records_per_day_rdd.take(5)))

Count of Records Per Day: [(u'2024-10-30', 75), (u'2024-10-24', 800), (u'2024-10-23', 635), (u'2024-10-22', 800), (u'2024-10-21', 800)]


##4. What are the symbols in the table?


In [9]:
# 4. What are the symbols in the table?
symbols_rdd = stock_rddNH.map(lambda row: row[0]).distinct()
print("First 3 symbols in the table: {}".format(symbols_rdd.take(3)))

First 3 symbols in the table: [u'MSFT', u'AAPL', u'IBM']


##5. What is the highest price for each symbol?


In [10]:
# 5. What is the highest price for each symbol?
price_per_symbol_rdd = stock_rddNH.map(lambda row: (row[0], safe_float(row[4])))  # Extract (symbol, high price)
price_per_symbol_rdd = price_per_symbol_rdd.filter(lambda x: x[1] is not None)  # Filter invalid entries
highest_price_per_symbol_rdd = price_per_symbol_rdd.reduceByKey(lambda a, b: max(a, b))
print("First 3 highest prices per symbol: {}".format(highest_price_per_symbol_rdd.take(3)))

First 3 highest prices per symbol: [(u'MSFT', 434.91), (u'AAPL', 236.8), (u'IBM', 233.325)]


##6. What is the lowest price for each symbol?


In [11]:
# 6. What is the lowest price for each symbol?
lowest_price_per_symbol_rdd = price_per_symbol_rdd.reduceByKey(lambda a, b: min(a, b))
print("First 3 lowest prices per symbol: {}".format(lowest_price_per_symbol_rdd.take(3)))

First 3 lowest prices per symbol: [(u'MSFT', 414.92), (u'AAPL', 228.07), (u'IBM', 204.63)]


##7. What is the average price for each symbol?

In [12]:
# Sum up the prices and count them for each symbol, then divide for average
avg_price_rdd = stock_rddNH.map(lambda row: (row[0], (float(row[2]), 1)))  # Map to (symbol, (price, count))
total_and_count_rdd = avg_price_rdd.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))  # Sum prices and counts
avg_price_per_symbol = total_and_count_rdd.mapValues(lambda x: x[0] / x[1])  # Calculate average
print("Average price for each symbol:", avg_price_per_symbol.take(3))  # Print first 3 symbols with average price

('Average price for each symbol:', [(u'MSFT', 1514.8585209003215), (u'AAPL', 3041.279742765273), (u'IBM', 739.1832797427653)])


##8. What is the range of price for each symbol?


In [13]:
# 8. What is the range of price for each symbol?
# Range = highest price - lowest price
price_range_per_symbol_rdd = stock_rddNH.map(lambda row: (row[0], (safe_float(row[4]), safe_float(row[5]))))  # (symbol, (high, low))
price_range_per_symbol_rdd = price_range_per_symbol_rdd.filter(lambda x: x[1][0] is not None and x[1][1] is not None)
min_max_price_rdd = price_range_per_symbol_rdd.reduceByKey(lambda a, b: (max(a[0], b[0]), min(a[1], b[1])))
price_range_per_symbol_rdd = min_max_price_rdd.mapValues(lambda x: x[0] - x[1])  # Range = max - min
print("First 3 price ranges per symbol: {}".format(price_range_per_symbol_rdd.take(3)))

First 3 price ranges per symbol: [(u'MSFT', 19.99000000000001), (u'AAPL', 8.740000000000009), (u'IBM', 28.814999999999998)]


##9. What is the date on which each symbol experienced the highest price?

In [14]:
# 9. What is the date on which each symbol experienced the highest price?
# We need to find the date corresponding to the highest price for each symbol
highest_price_date_per_symbol_rdd = stock_rddNH.map(lambda row: (row[0], (safe_float(row[4]), row[1])))  # (symbol, (high, date))
highest_price_date_per_symbol_rdd = highest_price_date_per_symbol_rdd.filter(lambda x: x[1][0] is not None)  # Filter invalid entries
highest_price_date_per_symbol_rdd = highest_price_date_per_symbol_rdd.reduceByKey(lambda a, b: a if a[0] > b[0] else b)
print("First 3 dates of highest prices per symbol: {}".format(highest_price_date_per_symbol_rdd.take(3)))

First 3 dates of highest prices per symbol: [(u'MSFT', (434.91, u'2024-10-30 15:46:00')), (u'AAPL', (236.8, u'2024-10-21 14:04:00')), (u'IBM', (233.325, u'2024-10-23 15:56:00'))]


#Step 3. Part B
In this step I will use Spark SQL to perform analysis on the data you transferred to Hadoop in the
previous project. This means that you should only use Spark Dataframes and their operations to find the
answers to the questions.

In [15]:
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from datetime import datetime

# Initialize SQLContext
sqlContext = SQLContext(sc)

# Define the schema
schema = StructType([
    StructField("stock", StringType(), True),
    StructField("datetime", TimestampType(), True), 
    StructField("volume", IntegerType(), True),
    StructField("open", DoubleType(), True),
    StructField("high", DoubleType(), True),
    StructField("low", DoubleType(), True),
    StructField("close", DoubleType(), True)
])

# Load the CSV file as RDD
file_path = "/training/flume/stock_csv.csv"  # Replace with your HDFS path
rdd = sc.textFile(file_path)

# Extract header
header = rdd.first()

# Filter header and split lines
data_rdd = rdd.filter(lambda line: line != header).map(lambda line: line.split(","))

# Function to convert datetime string to Python datetime object
def convert_to_datetime(datetime_str):
    return datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S")

# Explicitly cast each field to the correct type
row_rdd = data_rdd.map(lambda fields: (
    fields[0].encode("utf-8"),  # stock (String)
    convert_to_datetime(fields[1].encode("utf-8")),  # datetime (Datetime)
    int(fields[2]),             # volume (Integer)
    float(fields[3]),           # open (Double)
    float(fields[4]),           # high (Double)
    float(fields[5]),           # low (Double)
    float(fields[6])            # close (Double)
))

# Create DataFrame from RDD
stock_df = sqlContext.createDataFrame(row_rdd, schema)

# Register the DataFrame as a temporary SQL table
stock_df.registerTempTable("stocks")

# Verify schema and data
stock_df.printSchema()
stock_df.show()


root
 |-- stock: string (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- volume: integer (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)

stock datetime             volume open    high    low     close  
AAPL  2024-10-21 12:28:... 484    236.27  236.27  236.26  236.26 
IBM   2024-10-21 12:25:... 120    231.53  231.53  231.53  231.53 
GOOGL 2024-10-21 12:28:... 346    163.535 163.535 163.5   163.5  
MSFT  2024-10-21 12:27:... 456    415.16  415.16  415.06  415.06 
TSLA  2024-10-21 12:28:... 732    218.56  218.58  218.52  218.58 
AAPL  2024-10-22 14:54:... 1097   235.325 235.35  235.32  235.325
IBM   2024-10-22 14:53:... 333    231.78  231.78  231.77  231.77 
GOOGL 2024-10-22 14:54:... 102    164.69  164.69  164.69  164.69 
MSFT  2024-10-22 14:54:... 590    429.27  429.27  429.19  429.19 
TSLA  2024-10-22 14:54:... 441    217.26  217.335 217.26  217.325
AAPL 

##1) How many records are there in the table?

In [16]:
sqlContext.sql("SELECT COUNT(*) AS record_count FROM stocks").show()
#This number is 1 off project 2 because in this project I dropped headers before creating table, whereas in project 2 I forgot to remove the headers.


record_count
3110        


##2) How many different days are there in the table?


In [17]:
from pyspark.sql.functions import col

# Extract the date from the timestamp
distinct_days = stock_df.select(col("datetime").cast("date")).distinct().count()
print "Distinct days: {}".format(distinct_days)

Distinct days: 5


##3) How many records per each day are there in the table?

In [18]:
# Group by the extracted date and count the records
records_per_day = stock_df.withColumn("date", col("datetime").cast("date")) \
    .groupBy("date").count().orderBy("date")
records_per_day.show()


date       count
2024-10-21 800  
2024-10-22 800  
2024-10-23 635  
2024-10-24 800  
2024-10-30 75   


##4) What are the symbols in the table?

In [19]:
# Get distinct symbols
symbols = stock_df.select("stock").distinct()
symbols.show()


stock
AAPL 
GOOGL
IBM  
MSFT 
TSLA 


##5) What is the highest price for each symbol?

In [20]:
# Group by symbol and get the highest price for each symbol
highest_price = stock_df.groupBy("stock").agg({"high": "max"})
highest_price.show()


stock MAX(high#4)
AAPL  236.8      
GOOGL 175.66     
IBM   233.325    
MSFT  434.91     
TSLA  259.43     


##6) What is the lowest price for each symbol

In [21]:
# Group by symbol and get the lowest price for each
lowest_price = stock_df.groupBy("stock").agg({"low": "min"})
lowest_price.show()


stock MIN(low#5)
AAPL  228.06    
GOOGL 161.925   
IBM   204.51    
MSFT  414.92    
TSLA  212.4     


##7) What is the average price for each symbol?

In [22]:
# Group by symbol and get the average price for each symbol close
average_price = sqlContext.sql("""
    SELECT stock, AVG(close) AS average_price
    FROM stocks
    GROUP BY stock
""").show()


stock average_price     
AAPL  233.3181752411574 
GOOGL 163.89336816720268
IBM   227.56491157556266
MSFT  424.2107717041801 
TSLA  227.69815916398719


##8) What is the range of price for each symbol?

In [23]:
sqlContext.sql("""
    SELECT stock AS symbol, MAX(close) - MIN(close) AS price_range
    FROM stocks
    GROUP BY stock
""").show()


symbol price_range       
AAPL   8.719999999999999 
GOOGL  13.689999999999998
IBM    28.769999999999982
MSFT   19.95999999999998 
TSLA   46.94             


##9) What is the date on which each symbol experienced the highest price?

In [24]:
query = sqlContext.sql("""
    SELECT DISTINCT s.stock, s.datetime AS event_date, s.high
    FROM stocks s
    INNER JOIN (
        SELECT stock, MAX(high) AS max_high
        FROM stocks
        GROUP BY stock
    ) max_prices
    ON s.stock = max_prices.stock
    AND s.high = max_prices.max_high
    ORDER BY stock ASC
""")

# Show the results
query.show()

# Collect the first row
first_result = query.collect()[0][0]

print(first_result)


stock event_date           high   
AAPL  2024-10-21 14:04:... 236.8  
GOOGL 2024-10-30 15:43:... 175.66 
GOOGL 2024-10-30 15:44:... 175.66 
IBM   2024-10-23 15:56:... 233.325
MSFT  2024-10-30 15:47:... 434.91 
MSFT  2024-10-30 15:46:... 434.91 
TSLA  2024-10-30 15:45:... 259.43 
TSLA  2024-10-30 15:49:... 259.43 
AAPL
