In [1]:
import datetime
import os
import argparse
import pandas as pd
import yfinance as yahooFinance
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from google.cloud import storage

In [2]:
credentials_location = '/home/codespace/.config/gcloud/application_default_credentials.json'
bucket = os.environ.get("GCP_GCS_BUCKET", "dataenggzoomcamp_proj_yf")

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "./lib/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)



In [3]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")



24/04/14 08:56:19 WARN Utils: Your hostname, codespaces-6c38e6 resolves to a loopback address: 127.0.0.1; using 172.16.5.4 instead (on interface eth0)
24/04/14 08:56:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/04/14 08:56:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
year = 2023
#parser = argparse.ArgumentParser()
#parser.add_argument('--year', required=True)
#args = parser.parse_args()
#year = args.year

# startDate , as per our convenience we can modify
startDate = datetime.datetime(year, 1, 1)
 
# endDate , as per our convenience we can modify
endDate = datetime.datetime(year, 12, 31)


In [5]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [6]:
# Read and print the stock tickers that make up S&P500
tickers = pd.read_html('https://en.wikipedia.org/wiki/List_of_S%26P_500_companies')[0]
tickers.to_csv("list_500_companies.csv")

files = ['META', 'AMZN', 'GOOGL', 'AMD']
schema = types.StructType([
    types.StructField('Date', types.DateType(), True),
    types.StructField('Open', types.FloatType(), True),
    types.StructField('High', types.FloatType(), True),
    types.StructField('Low', types.FloatType(), True),
    types.StructField('Close', types.FloatType(), True),
    types.StructField('Volume', types.IntegerType(), True),
    types.StructField('Dividends', types.FloatType(), True),
    types.StructField('Stock Splits', types.FloatType(), True)
])

In [48]:
def upload_to_gcs(bucket, gcs_folder, local_folder):
    """
    Ref: https://cloud.google.com/storage/docs/uploading-objects#storage-upload-object-python
    """
    # # WORKAROUND to prevent timeout for files > 6 MB on 800 kbps upload speed.
    # # (Ref: https://github.com/googleapis/python-storage/issues/74)
    # storage.blob._MAX_MULTIPART_SIZE = 5 * 1024 * 1024  # 5 MB
    # storage.blob._DEFAULT_CHUNKSIZE = 5 * 1024 * 1024  # 5 MB

    client = storage.Client()
    bucket = client.bucket(bucket)
    #blob = bucket.blob(object_name)
    #blob.upload_from_filename(local_file)

    from os import listdir
    from os.path import isfile, join
    files = [f for f in listdir(local_folder) if isfile(join(local_folder, f))]
    #files = ['part-00000-f0df2b0f-4b5b-4d41-8675-dad7e879e5a8-c000.snappy.parquet', 'part-00001-f0df2b0f-4b5b-4d41-8675-dad7e879e5a8-c000.snappy.parquet']
    for file in files:
        print(file)
        local_file = local_folder + file
        print(local_file)
        gcs_file = gcs_folder + file
        print(gcs_file)
        blob = bucket.blob(gcs_file)
        blob.upload_from_filename(local_file)

In [50]:
for f in files:
    stock = yahooFinance.Ticker(f)
    # pass the parameters as the taken dates for start and end
    hist = stock.history(start=startDate,end=endDate)
    filename = f'{f}_{year}.csv'
    print(f'Writing to csv file - {filename}')
    hist.to_csv(f'{f}_{year}.csv')
    print(filename)
    df = spark.read \
        .option("header", "true") \
        .schema(schema) \
        .csv(f'{f}_{year}.csv')
    df = df.repartition(24)
    pq_path = f'yf/pq/{f}/{year}/'
    gcs_path = f'pq/{f}/{year}/'
    print(f"Writing to pq file - {pq_path}")
    df.write.parquet(pq_path, mode='overwrite')
    # upload it to gcs 
    print(f"Writing to GCS - {gcs_path}")
    upload_to_gcs(bucket, gcs_path, pq_path)
    df_pq = spark.read.parquet(f'gs://dataenggzoomcamp_proj_yf/pq/{f}/*')
    df_pq.registerTempTable(f'yf_{f}')

Writing to csv file - META_2023.csv
META_2023.csv
Writing to pq file - yf/pq/META/2023/


                                                                                

Writing to GCS - pq/META/2023/
.part-00000-395e8e7b-fe14-49e1-8f2d-68cd5327d785-c000.snappy.parquet.crc
yf/pq/META/2023/.part-00000-395e8e7b-fe14-49e1-8f2d-68cd5327d785-c000.snappy.parquet.crc
pq/META/2023/.part-00000-395e8e7b-fe14-49e1-8f2d-68cd5327d785-c000.snappy.parquet.crc
.part-00012-395e8e7b-fe14-49e1-8f2d-68cd5327d785-c000.snappy.parquet.crc
yf/pq/META/2023/.part-00012-395e8e7b-fe14-49e1-8f2d-68cd5327d785-c000.snappy.parquet.crc
pq/META/2023/.part-00012-395e8e7b-fe14-49e1-8f2d-68cd5327d785-c000.snappy.parquet.crc
.part-00011-395e8e7b-fe14-49e1-8f2d-68cd5327d785-c000.snappy.parquet.crc
yf/pq/META/2023/.part-00011-395e8e7b-fe14-49e1-8f2d-68cd5327d785-c000.snappy.parquet.crc
pq/META/2023/.part-00011-395e8e7b-fe14-49e1-8f2d-68cd5327d785-c000.snappy.parquet.crc
.part-00002-395e8e7b-fe14-49e1-8f2d-68cd5327d785-c000.snappy.parquet.crc
yf/pq/META/2023/.part-00002-395e8e7b-fe14-49e1-8f2d-68cd5327d785-c000.snappy.parquet.crc
pq/META/2023/.part-00002-395e8e7b-fe14-49e1-8f2d-68cd5327d785-c



Writing to csv file - AMZN_2023.csv
AMZN_2023.csv
Writing to pq file - yf/pq/AMZN/2023/


                                                                                

Writing to GCS - pq/AMZN/2023/
part-00010-d717770d-26ed-4dd2-ac56-bc8f81259f9a-c000.snappy.parquet
yf/pq/AMZN/2023/part-00010-d717770d-26ed-4dd2-ac56-bc8f81259f9a-c000.snappy.parquet
pq/AMZN/2023/part-00010-d717770d-26ed-4dd2-ac56-bc8f81259f9a-c000.snappy.parquet
part-00020-d717770d-26ed-4dd2-ac56-bc8f81259f9a-c000.snappy.parquet
yf/pq/AMZN/2023/part-00020-d717770d-26ed-4dd2-ac56-bc8f81259f9a-c000.snappy.parquet
pq/AMZN/2023/part-00020-d717770d-26ed-4dd2-ac56-bc8f81259f9a-c000.snappy.parquet
part-00000-d717770d-26ed-4dd2-ac56-bc8f81259f9a-c000.snappy.parquet
yf/pq/AMZN/2023/part-00000-d717770d-26ed-4dd2-ac56-bc8f81259f9a-c000.snappy.parquet
pq/AMZN/2023/part-00000-d717770d-26ed-4dd2-ac56-bc8f81259f9a-c000.snappy.parquet
part-00018-d717770d-26ed-4dd2-ac56-bc8f81259f9a-c000.snappy.parquet
yf/pq/AMZN/2023/part-00018-d717770d-26ed-4dd2-ac56-bc8f81259f9a-c000.snappy.parquet
pq/AMZN/2023/part-00018-d717770d-26ed-4dd2-ac56-bc8f81259f9a-c000.snappy.parquet
part-00005-d717770d-26ed-4dd2-ac56-bc



Writing to csv file - GOOGL_2023.csv
GOOGL_2023.csv
Writing to pq file - yf/pq/GOOGL/2023/


                                                                                

Writing to GCS - pq/GOOGL/2023/
part-00016-3d5d6e23-cf24-4823-91d0-a27c683d90c8-c000.snappy.parquet
yf/pq/GOOGL/2023/part-00016-3d5d6e23-cf24-4823-91d0-a27c683d90c8-c000.snappy.parquet
pq/GOOGL/2023/part-00016-3d5d6e23-cf24-4823-91d0-a27c683d90c8-c000.snappy.parquet
.part-00020-3d5d6e23-cf24-4823-91d0-a27c683d90c8-c000.snappy.parquet.crc
yf/pq/GOOGL/2023/.part-00020-3d5d6e23-cf24-4823-91d0-a27c683d90c8-c000.snappy.parquet.crc
pq/GOOGL/2023/.part-00020-3d5d6e23-cf24-4823-91d0-a27c683d90c8-c000.snappy.parquet.crc
part-00005-3d5d6e23-cf24-4823-91d0-a27c683d90c8-c000.snappy.parquet
yf/pq/GOOGL/2023/part-00005-3d5d6e23-cf24-4823-91d0-a27c683d90c8-c000.snappy.parquet
pq/GOOGL/2023/part-00005-3d5d6e23-cf24-4823-91d0-a27c683d90c8-c000.snappy.parquet
.part-00010-3d5d6e23-cf24-4823-91d0-a27c683d90c8-c000.snappy.parquet.crc
yf/pq/GOOGL/2023/.part-00010-3d5d6e23-cf24-4823-91d0-a27c683d90c8-c000.snappy.parquet.crc
pq/GOOGL/2023/.part-00010-3d5d6e23-cf24-4823-91d0-a27c683d90c8-c000.snappy.parquet.cr



Writing to csv file - AMD_2023.csv
AMD_2023.csv
Writing to pq file - yf/pq/AMD/2023/


                                                                                

Writing to GCS - pq/AMD/2023/
part-00014-0b77e060-6e47-4c8d-a651-7a464548737f-c000.snappy.parquet
yf/pq/AMD/2023/part-00014-0b77e060-6e47-4c8d-a651-7a464548737f-c000.snappy.parquet
pq/AMD/2023/part-00014-0b77e060-6e47-4c8d-a651-7a464548737f-c000.snappy.parquet
part-00016-0b77e060-6e47-4c8d-a651-7a464548737f-c000.snappy.parquet
yf/pq/AMD/2023/part-00016-0b77e060-6e47-4c8d-a651-7a464548737f-c000.snappy.parquet
pq/AMD/2023/part-00016-0b77e060-6e47-4c8d-a651-7a464548737f-c000.snappy.parquet
.part-00001-0b77e060-6e47-4c8d-a651-7a464548737f-c000.snappy.parquet.crc
yf/pq/AMD/2023/.part-00001-0b77e060-6e47-4c8d-a651-7a464548737f-c000.snappy.parquet.crc
pq/AMD/2023/.part-00001-0b77e060-6e47-4c8d-a651-7a464548737f-c000.snappy.parquet.crc
.part-00019-0b77e060-6e47-4c8d-a651-7a464548737f-c000.snappy.parquet.crc
yf/pq/AMD/2023/.part-00019-0b77e060-6e47-4c8d-a651-7a464548737f-c000.snappy.parquet.crc
pq/AMD/2023/.part-00019-0b77e060-6e47-4c8d-a651-7a464548737f-c000.snappy.parquet.crc
part-00007-0b77e



In [55]:
df_result = spark.sql("""
SELECT
    'META', Date, Open, High, Low, Close, Volume
FROM
    yf_META

union

SELECT
    'AMZN', Date, Open, High, Low, Close, Volume
FROM
    yf_AMZN

union

SELECT
    'GOOGL', Date, Open, High, Low, Close, Volume
FROM
    yf_GOOGL

union

SELECT
    'AMD', Date, Open, High, Low, Close, Volume
FROM
    yf_AMD
    
""")

In [56]:
df_result.show()



+----+----------+----------+---------+---------+---------+--------+
|META|      Date|      Open|     High|      Low|    Close|  Volume|
+----+----------+----------+---------+---------+---------+--------+
|META|2023-02-16|  172.5669|175.66362|171.60791|172.25723|25827500|
|META|2023-02-23|  171.8177| 173.5059|169.20049|171.85765|20017800|
|META|2023-06-09|  262.2018|267.66602|261.42264| 264.6692|16938500|
|META|2023-08-18| 278.73428| 285.3872| 274.0892| 282.9498|34061200|
|META|2023-01-06| 128.83331|130.19186|125.90642| 129.8822|27584500|
|META|2023-05-02| 242.92224| 244.6604| 238.7367|238.98643|24350100|
|META|2023-08-01| 317.20346|323.79645| 314.3265|322.36795|22817900|
|META|2023-07-31| 323.34692|325.31485| 317.2534|318.26233|25799600|
|META|2023-03-13|  177.7714|183.58522|174.63472|180.70827|24728000|
|META|2023-05-19| 247.20772|248.42642|243.15202|245.37965|21599800|
|META|2023-09-15| 311.27972| 311.6693|298.43335| 299.9917|28106400|
|META|2023-01-05|125.996315|128.38379|  124.408|

                                                                                

In [57]:
df_result.coalesce(1).write.parquet('yf/out/', mode='overwrite')
#df_result.write.format('bigquery') \
#    .option('table', 'gs://dataenggzoomcamp_proj_yf/report/2023') \
#    .save()

                                                                                