# Setup Spark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q findspark

In [None]:
!wget -q https://downloads.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz

In [None]:
!tar xf spark-3.0.3-bin-hadoop2.7.tgz

# Spark initialization

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
findspark.find()

'/content/spark-3.0.3-bin-hadoop2.7'

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
spark

# JSON object classes

In [None]:
class Date:
  def __init__(self, date_pattern):
    self.year, self.month, self.day = map(int, date_pattern.split("-"))
    assert self.year >= 0
    assert 1 <= self.month <= 12
    assert 1 <= self.day <= 31
  
  def __str__(self):
    return "{}-{}-{} 00:00:00".format(self.day, self.month, self.year)

  def __lt__(self, other):
    return (self.year, self.month, self.day) < (other.year, other.month, other.day)

  def __le__(self, other):
    return (self.year, self.month, self.day) <= (other.year, other.month, other.day)

  def __gt__(self, other):
    return (self.year, self.month, self.day) > (other.year, other.month, other.day)

  def __ge__(self, other):
    return (self.year, self.month, self.day) >= (other.year, other.month, other.day)

  def __eq__(self, other):
    return (self.year, self.month, self.day) == (other.year, other.month, other.day)

  def __ne__(self, other):
    return (self.year, self.month, self.day) != (other.year, other.month, other.day)

In [None]:
from collections import OrderedDict

class DailyStock:
  def __init__(self, symbol, date, open=-1, high=-1, low=-1, close=-1, adjusted_close=-1, volume=-1, dividend_amount=-1, split_coefficient=-1):
    self.symbol = symbol
    self.date = Date(date)
    self.open = float(open)
    self.high = float(high)
    self.low = float(low)
    self.close = float(close)
    self.adjusted_close = float(adjusted_close)
    self.volume = float(volume)
    self.dividend_amount = float(dividend_amount)
    self.split_coefficient = float(split_coefficient)
  
  def __str__(self):
    return "{}, {}, {}, {}, {}, {}, {}, {}, {}".format(self.symbol, self.date, self.open, self.high, self.low, self.close, self.adjusted_close, self.dividend_amount, self.split_coefficient)
  
  def __lt__(self, other):
    assert self.symbol == other.symbol
    return self.date < other.date

  def __le__(self, other):
    assert self.symbol == other.symbol
    return self.date <= other.date

  def __gt__(self, other):
    assert self.symbol == other.symbol
    return self.date > other.date

  def __ge__(self, other):
    assert self.symbol == other.symbol
    return self.date >= other.date

  def __eq__(self, other):
    assert self.symbol == other.symbol
    return self.date == other.date

  def __ne__(self, other):
    assert self.symbol == other.symbol
    return self.date != other.date

  @staticmethod
  def create_columns():
    return ['Date', 'Symbol', 'Year', 'Month', 'Day', 'Open', 'High', 'Low', 'Close',
            'Adjusted Close', 'Volumne', 'Dividend Amount', 'Split Coefficient']

  @staticmethod
  def parse_from_json(json_object: dict):
    try:
      open = json_object['1. open']
    except Exception:
      open = -1

    try:
      high = json_object['2. high']
    except Exception:
      high = -1

    try:
      low = json_object['3. low']
    except Exception:
      low = -1

    try:
      close = json_object['4. close']
    except Exception:
      close = -1
        
    try:
      adjusted_close = json_object['5. adjusted close']
    except Exception:
      adjusted_close = -1
          
    try:
      volume = json_object['6. volume']
    except Exception:
      volume = -1

    try:
      dividend_amount = json_object['7. dividend amount']
    except Exception:
      dividend_amount = -1

    try:
      split_coefficient = json_object['8. split coefficient']
    except Exception:
      split_coefficient = -1
    return DailyStock(symbol, date, open, high, low, close,
                    adjusted_close, volume, dividend_amount, split_coefficient)
    
  def create_tuple(self):
    return (str(self.date), self.symbol, self.date.year, self.date.month, self.date.day,
            self.open, self.high, self.low, self.close, self.adjusted_close,
            self.volume, self.dividend_amount, self.split_coefficient)


# Read JSON files and save into .csv files

In [None]:
symbol_list = []

for chunk_number in range(9):
  with open('/content/drive/MyDrive/BigData/SymbolList/chunk{}.txt'.format(chunk_number), 'r') as f:
    raw_symbol_list = f.readlines()
    symbol_list += list(map(lambda x: x.strip('\n'), raw_symbol_list))


In [None]:
import json
 
symbol = 'AAME'
columns = DailyStock.create_columns()
df = []
f = []
for i, symbol in enumerate(symbol_list):
  try:
    f.append(open("/content/drive/MyDrive/BigData/{}.json".format(symbol), "r"))
  except Exception:
    pass

for i, symbol in enumerate(symbol_list[:2]):
    data = json.load(f[i])
    if 'Time Series (Daily)' not in data:
      pass
    else:
      ds_list = []
      for date in data['Time Series (Daily)']:
        ds_list.append(DailyStock.parse_from_json(data['Time Series (Daily)'][date]))
      
      ds_list.sort(key=lambda ds: ds.date, reverse=True)
      symbol_df = spark.createDataFrame(list(map(lambda ds: ds.create_tuple(), ds_list)), columns)
      symbol_df.toPandas().to_csv('/content/drive/MyDrive/BigDataTest/{}.csv'.format(symbol), index=False)
      df.append(symbol_df)
      symbol_df.unpersist()


In [None]:
import functools

df = functools.reduce(lambda a, b: a.union(b), df)
df.toPandas().to_csv('/content/drive/MyDrive/BigDataTest/StockData.csv', index=False)
df.count()

3484