In [47]:
import os
import requests
from pyspark.sql.functions import lit
from pyspark.sql import SparkSession, functions as f

In [48]:
from dotenv import load_dotenv
load_dotenv()
key=os.environ.get("KEY")
spark = SparkSession.builder.getOrCreate()

Amazon.com Inc (AMZN)

In [49]:
symbol = 'AMZN'
url = f'https://www.alphavantage.co/query?function=TIME_SERIES_DAILY_ADJUSTED&symbol={symbol}&apikey={key}'
r = requests.get(url)
data = [r.json()]

company_name = 'Amazon.com Inc'
rdd = spark.sparkContext.parallelize(data)
df = spark.read.json(rdd)
clean_data = df.select(f.col("Time Series (Daily).2022-12-22.`1. open`"), \
            f.col("Time Series (Daily).2022-12-22.`2. high`"), \
            f.col("Time Series (Daily).2022-12-22.`3. low`"), \
            f.col("Time Series (Daily).2022-12-22.`4. close`"))

add_company_col =clean_data.withColumn("company_name", lit(company_name)) \

amzn_com_df = add_company_col.toDF("open","high","low","close","company_name")
amzn_com_df.show()

+-----+-----+-----+-----+--------------+
| open| high|  low|close|  company_name|
+-----+-----+-----+-----+--------------+
|85.52|85.68|82.25|83.79|Amazon.com Inc|
+-----+-----+-----+-----+--------------+



In [50]:
symbol = 'AMZN'
url = f'https://www.alphavantage.co/query?function=OVERVIEW&symbol={symbol}&apikey={key}'
r = requests.get(url)
data = [r.json()]

company_name = 'Amazon.com Inc'
rdd = spark.sparkContext.parallelize(data)
df = spark.read.json(rdd)
clean_data = df.select(f.col("PERatio"),f.col("GrossProfitTTM"))
add_company_col =clean_data.withColumn("company_name", lit(company_name))
amzn_com_df1 = add_company_col.toDF("PERatio","GrossProfitTTM","companyname")
amzn_com_df1.show()

+-------+--------------+--------------+
|PERatio|GrossProfitTTM|   companyname|
+-------+--------------+--------------+
|  76.12|  197478000000|Amazon.com Inc|
+-------+--------------+--------------+



In [51]:
with_peratio=amzn_com_df.join(amzn_com_df1,amzn_com_df.company_name ==  amzn_com_df1.companyname,"inner")
amzn_com=with_peratio.withColumnRenamed('PERatio', 'PERatio_of_last_quarter').withColumnRenamed('GrossProfitTTM', 'profit_margin_last_four_quaters')
amzn_com.select('open','high','low','close','PERatio_of_last_quarter','profit_margin_last_four_quaters','company_name').show(truncate=False)

+-----+-----+-----+-----+-----------------------+-------------------------------+--------------+
|open |high |low  |close|PERatio_of_last_quarter|profit_margin_last_four_quaters|company_name  |
+-----+-----+-----+-----+-----------------------+-------------------------------+--------------+
|85.52|85.68|82.25|83.79|76.12                  |197478000000                   |Amazon.com Inc|
+-----+-----+-----+-----+-----------------------+-------------------------------+--------------+



Intel Corporation (INTC)

In [52]:
symbol = 'INTC'
url = f'https://www.alphavantage.co/query?function=TIME_SERIES_DAILY_ADJUSTED&symbol={symbol}&apikey={key}'
r = requests.get(url)
data = [r.json()]

company_name = 'Intel Corporation'
rdd = spark.sparkContext.parallelize(data)
df = spark.read.json(rdd)
clean_data = df.select(f.col("Time Series (Daily).2022-12-22.`1. open`"), \
            f.col("Time Series (Daily).2022-12-22.`2. high`"), \
            f.col("Time Series (Daily).2022-12-22.`3. low`"), \
            f.col("Time Series (Daily).2022-12-22.`4. close`"))

add_company_col =clean_data.withColumn("company_name", lit(company_name)) \

intel_com_df = add_company_col.toDF("open","high","low","close","company_name")
intel_com_df.show()

+-----+------+-----+-----+-----------------+
| open|  high|  low|close|     company_name|
+-----+------+-----+-----+-----------------+
|26.45|26.452|25.35|25.97|Intel Corporation|
+-----+------+-----+-----+-----------------+



In [53]:
symbol = 'INTC'
url = f'https://www.alphavantage.co/query?function=OVERVIEW&symbol={symbol}&apikey={key}'
r = requests.get(url)
data = [r.json()]

company_name = 'Intel Corporation'
rdd = spark.sparkContext.parallelize(data)
df = spark.read.json(rdd)
clean_data = df.select(f.col("PERatio"),f.col("GrossProfitTTM"))
add_company_col =clean_data.withColumn("company_name", lit(company_name))
intel_com_df1 = add_company_col.toDF("PERatio","GrossProfitTTM","companyname")
intel_com_df1.show()

+-------+--------------+-----------------+
|PERatio|GrossProfitTTM|      companyname|
+-------+--------------+-----------------+
|   8.08|   43815000000|Intel Corporation|
+-------+--------------+-----------------+



In [54]:
with_peratio=intel_com_df.join(intel_com_df1,intel_com_df.company_name ==  intel_com_df1.companyname,"inner")
intel_com=with_peratio.withColumnRenamed('PERatio', 'PERatio_of_last_quarter').withColumnRenamed('GrossProfitTTM', 'profit_margin_last_four_quaters')
intel_com.select('open','high','low','close','PERatio_of_last_quarter','profit_margin_last_four_quaters','company_name').show(truncate=False)

+-----+------+-----+-----+-----------------------+-------------------------------+-----------------+
|open |high  |low  |close|PERatio_of_last_quarter|profit_margin_last_four_quaters|company_name     |
+-----+------+-----+-----+-----------------------+-------------------------------+-----------------+
|26.45|26.452|25.35|25.97|8.08                   |43815000000                    |Intel Corporation|
+-----+------+-----+-----+-----------------------+-------------------------------+-----------------+



                                                                                

JPMorgan Chase & Co (JPM)

In [55]:
symbol = 'JPM'
url = f'https://www.alphavantage.co/query?function=TIME_SERIES_DAILY_ADJUSTED&symbol={symbol}&apikey={key}'
r = requests.get(url)
data = [r.json()]

company_name = 'JPMorgan Chase & Co'
rdd = spark.sparkContext.parallelize(data)
df = spark.read.json(rdd)
clean_data = df.select(f.col("Time Series (Daily).2022-12-22.`1. open`"), \
            f.col("Time Series (Daily).2022-12-22.`2. high`"), \
            f.col("Time Series (Daily).2022-12-22.`3. low`"), \
            f.col("Time Series (Daily).2022-12-22.`4. close`"))

add_company_col =clean_data.withColumn("company_name", lit(company_name)) \

jp_com_df = add_company_col.toDF("open","high","low","close","company_name")
jp_com_df.show()

+-----+-----+------+------+-------------------+
| open| high|   low| close|       company_name|
+-----+-----+------+------+-------------------+
|131.1|131.3|128.41|130.66|JPMorgan Chase & Co|
+-----+-----+------+------+-------------------+



In [57]:
symbol = 'JPM'
url = f'https://www.alphavantage.co/query?function=OVERVIEW&symbol={symbol}&apikey={key}'
r = requests.get(url)
data = [r.json()]

company_name = 'JPMorgan Chase & Co'
rdd = spark.sparkContext.parallelize(data)
df = spark.read.json(rdd)
clean_data = df.select(f.col("PERatio"),f.col("GrossProfitTTM"))
add_company_col =clean_data.withColumn("company_name", lit(company_name))
jp_com_df1 = add_company_col.toDF("PERatio","GrossProfitTTM","companyname")
jp_com_df1.show()

+-------+--------------+-------------------+
|PERatio|GrossProfitTTM|        companyname|
+-------+--------------+-------------------+
|  11.08|  130898000000|JPMorgan Chase & Co|
+-------+--------------+-------------------+



In [58]:
with_peratio=jp_com_df.join(jp_com_df1,jp_com_df.company_name ==  jp_com_df1.companyname,"inner")
jp_com=with_peratio.withColumnRenamed('PERatio', 'PERatio_of_last_quarter').withColumnRenamed('GrossProfitTTM', 'profit_margin_last_four_quaters')
jp_com.select('open','high','low','close','PERatio_of_last_quarter','profit_margin_last_four_quaters','company_name').show(truncate=False)

+-----+-----+------+------+-----------------------+-------------------------------+-------------------+
|open |high |low   |close |PERatio_of_last_quarter|profit_margin_last_four_quaters|company_name       |
+-----+-----+------+------+-----------------------+-------------------------------+-------------------+
|131.1|131.3|128.41|130.66|11.08                  |130898000000                   |JPMorgan Chase & Co|
+-----+-----+------+------+-----------------------+-------------------------------+-------------------+



                                                                                

Coca-Cola Co (KO)

In [59]:
symbol = 'KO'
url = f'https://www.alphavantage.co/query?function=TIME_SERIES_DAILY_ADJUSTED&symbol={symbol}&apikey={key}'
r = requests.get(url)
data = [r.json()]

company_name = 'Coca-Cola Co'
rdd = spark.sparkContext.parallelize(data)
df = spark.read.json(rdd)
clean_data = df.select(f.col("Time Series (Daily).2022-12-22.`1. open`"), \
            f.col("Time Series (Daily).2022-12-22.`2. high`"), \
            f.col("Time Series (Daily).2022-12-22.`3. low`"), \
            f.col("Time Series (Daily).2022-12-22.`4. close`"))

add_company_col =clean_data.withColumn("company_name", lit(company_name)) \

coca_com_df = add_company_col.toDF("open","high","low","close","company_name")
coca_com_df.show()

+-----+-----+------+-----+------------+
| open| high|   low|close|company_name|
+-----+-----+------+-----+------------+
|63.42|63.59|62.645|63.34|Coca-Cola Co|
+-----+-----+------+-----+------------+



In [60]:
symbol = 'KO'
url = f'https://www.alphavantage.co/query?function=OVERVIEW&symbol={symbol}&apikey={key}'
r = requests.get(url)
data = [r.json()]

company_name = 'Coca-Cola Co'
rdd = spark.sparkContext.parallelize(data)
df = spark.read.json(rdd)
clean_data = df.select(f.col("PERatio"),f.col("GrossProfitTTM"))
add_company_col =clean_data.withColumn("company_name", lit(company_name))
coca_com_df1 = add_company_col.toDF("PERatio","GrossProfitTTM","companyname")
coca_com_df1.show()

+-------+--------------+------------+
|PERatio|GrossProfitTTM| companyname|
+-------+--------------+------------+
|  27.99|   23298000000|Coca-Cola Co|
+-------+--------------+------------+



In [61]:
with_peratio=coca_com_df.join(coca_com_df1,coca_com_df.company_name ==  coca_com_df1.companyname,"inner")
coca_cola_com=with_peratio.withColumnRenamed('PERatio', 'PERatio_of_last_quarter').withColumnRenamed('GrossProfitTTM', 'profit_margin_last_four_quaters')
coca_cola_com.select('open','high','low','close','PERatio_of_last_quarter','profit_margin_last_four_quaters','company_name').show(truncate=False)

+-----+-----+------+-----+-----------------------+-------------------------------+------------+
|open |high |low   |close|PERatio_of_last_quarter|profit_margin_last_four_quaters|company_name|
+-----+-----+------+-----+-----------------------+-------------------------------+------------+
|63.42|63.59|62.645|63.34|27.99                  |23298000000                    |Coca-Cola Co|
+-----+-----+------+-----+-----------------------+-------------------------------+------------+



                                                                                

Walt Disney Company (DIS)

In [62]:
symbol = 'DIS'
url = f'https://www.alphavantage.co/query?function=TIME_SERIES_DAILY_ADJUSTED&symbol={symbol}&apikey={key}'
r = requests.get(url)
data = [r.json()]

company_name = 'Walt Disney Company'
rdd = spark.sparkContext.parallelize(data)
df = spark.read.json(rdd)
clean_data = df.select(f.col("Time Series (Daily).2022-12-22.`1. open`"), \
            f.col("Time Series (Daily).2022-12-22.`2. high`"), \
            f.col("Time Series (Daily).2022-12-22.`3. low`"), \
            f.col("Time Series (Daily).2022-12-22.`4. close`"))

add_company_col =clean_data.withColumn("company_name", lit(company_name))
disney_com_df = add_company_col.toDF("open","high","low","close","company_name")
disney_com_df.show()

+-----+-----+-----+-----+-------------------+
| open| high|  low|close|       company_name|
+-----+-----+-----+-----+-------------------+
|86.03|86.73|84.69|86.67|Walt Disney Company|
+-----+-----+-----+-----+-------------------+



In [63]:
symbol = 'DIS'
url = f'https://www.alphavantage.co/query?function=OVERVIEW&symbol={symbol}&apikey={key}'
r = requests.get(url)
data = [r.json()]

company_name = 'Walt Disney Company'
rdd = spark.sparkContext.parallelize(data)
df = spark.read.json(rdd)
clean_data = df.select(f.col("PERatio"),f.col("GrossProfitTTM"))
add_company_col =clean_data.withColumn("company_name", lit(company_name))
disney_com_df1 = add_company_col.toDF("PERatio","GrossProfitTTM","companyname")
disney_com_df1.show()

+-------+--------------+-------------------+
|PERatio|GrossProfitTTM|        companyname|
+-------+--------------+-------------------+
|  49.44|   28321000000|Walt Disney Company|
+-------+--------------+-------------------+



In [64]:
with_peratio=disney_com_df.join(disney_com_df1,disney_com_df.company_name ==  disney_com_df1.companyname,"inner")
disney_com=with_peratio.withColumnRenamed('PERatio', 'PERatio_of_last_quarter').withColumnRenamed('GrossProfitTTM', 'profit_margin_last_four_quaters')
disney_com.select('open','high','low','close','PERatio_of_last_quarter','profit_margin_last_four_quaters','company_name').show(truncate=False)

+-----+-----+-----+-----+-----------------------+-------------------------------+-------------------+
|open |high |low  |close|PERatio_of_last_quarter|profit_margin_last_four_quaters|company_name       |
+-----+-----+-----+-----+-----------------------+-------------------------------+-------------------+
|86.03|86.73|84.69|86.67|49.44                  |28321000000                    |Walt Disney Company|
+-----+-----+-----+-----+-----------------------+-------------------------------+-------------------+



                                                                                

In [65]:
union1 = amzn_com.union(intel_com)
union2 = union1.union(jp_com)
union3 = union2.union(coca_cola_com)
union4 = union3.union(disney_com)
final_df = union4
final_df.printSchema()
final_df.show()

root
 |-- open: string (nullable = true)
 |-- high: string (nullable = true)
 |-- low: string (nullable = true)
 |-- close: string (nullable = true)
 |-- company_name: string (nullable = false)
 |-- PERatio_of_last_quarter: string (nullable = true)
 |-- profit_margin_last_four_quaters: string (nullable = true)
 |-- companyname: string (nullable = false)





+-----+------+------+------+-------------------+-----------------------+-------------------------------+-------------------+
| open|  high|   low| close|       company_name|PERatio_of_last_quarter|profit_margin_last_four_quaters|        companyname|
+-----+------+------+------+-------------------+-----------------------+-------------------------------+-------------------+
|85.52| 85.68| 82.25| 83.79|     Amazon.com Inc|                  76.12|                   197478000000|     Amazon.com Inc|
|26.45|26.452| 25.35| 25.97|  Intel Corporation|                   8.08|                    43815000000|  Intel Corporation|
|131.1| 131.3|128.41|130.66|JPMorgan Chase & Co|                  11.08|                   130898000000|JPMorgan Chase & Co|
|63.42| 63.59|62.645| 63.34|       Coca-Cola Co|                  27.99|                    23298000000|       Coca-Cola Co|
|86.03| 86.73| 84.69| 86.67|Walt Disney Company|                  49.44|                    28321000000|Walt Disney Company|


                                                                                

In [66]:
spark.stop()