In [71]:
import json, time
import pyspark
import os
import re

In [72]:
from pyspark.sql.types import DataType, BooleanType, NullType, IntegerType, StringType, MapType

In [73]:
from pyspark.sql.functions import udf, explode, to_date

# Getting file names

In [74]:
DIR = 'archive'
dir_list = []
dir_list += [os.path.join(DIR,file) for file in os.listdir(DIR) if os.path.isdir(os.path.join(DIR, file))]
dir_list.sort()

In [75]:
# Get latest quarter
quarter_folder = dir_list[-2]

In [76]:
json_files = []
json_files += [os.path.join(quarter_folder, file) for file in os.listdir(quarter_folder)]

In [77]:
json_file = json_files[0]
json_file

'archive/2022.QTR2/0001493152-22-013349.json'

# Initialize PySpark session

In [78]:
spark = pyspark.sql.SparkSession.builder \
    .appName("Decode_json_files") \
    .config("spark.jars", "postgresql-42.5.1.jar") \
    .getOrCreate()

# Data Exploration

In [79]:
%time
df = spark.read.option("multiline", "true").json(json_file)

CPU times: user 2 µs, sys: 1 µs, total: 3 µs
Wall time: 24.1 µs


In [80]:
df.printSchema()

root
 |-- data: struct (nullable = true)
 |    |-- bs: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- concept: string (nullable = true)
 |    |    |    |-- label: string (nullable = true)
 |    |    |    |-- unit: string (nullable = true)
 |    |    |    |-- value: string (nullable = true)
 |    |-- cf: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- concept: string (nullable = true)
 |    |    |    |-- label: string (nullable = true)
 |    |    |    |-- unit: string (nullable = true)
 |    |    |    |-- value: string (nullable = true)
 |    |-- ic: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- concept: string (nullable = true)
 |    |    |    |-- label: string (nullable = true)
 |    |    |    |-- unit: string (nullable = true)
 |    |    |    |-- value: string (nullable = true)
 |-- endDate: string (nullable = true)
 |-- quarter: string

### Balance sheet spark dataframe

In [81]:
df_bs = df.withColumn("bs", explode('data.bs')).select('bs')

In [82]:
df_bs.printSchema()

root
 |-- bs: struct (nullable = true)
 |    |-- concept: string (nullable = true)
 |    |-- label: string (nullable = true)
 |    |-- unit: string (nullable = true)
 |    |-- value: string (nullable = true)



In [83]:
df_bs = df_bs.select('bs.concept', 'bs.label', 'bs.unit', 'bs.value').distinct()

In [84]:
df_bs.show()

+--------------------+--------------------+----+----------+
|             concept|               label|unit|     value|
+--------------------+--------------------+----+----------+
|LiabilitiesNoncur...|Total long-term l...| usd|    253928|
|       AssetsCurrent|Total current assets| usd|  21649936|
| NotesPayableCurrent|                    | usd|    122175|
|RetainedEarningsA...|                    | usd|-177309522|
|AdditionalPaidInC...|                    | usd| 195077466|
|LiabilitiesAndSto...|Total liabilities...| usd|  22203742|
|OtherReceivablesN...|                    | usd|       N/A|
| PreferredStockValue|                    | usd|   2656713|
|CashAndCashEquiva...|                    | usd|  21372463|
|              Assets|        Total assets| usd|  22203742|
|  LiabilitiesCurrent|Total current lia...| usd|   1408762|
|OperatingLeaseLia...|Operating lease l...| usd|    253928|
|OperatingLeaseLia...|                    | usd|    192535|
|    CommonStockValue|                  

### Cash flow spark dataframe

In [85]:
df_cf = df.withColumn("bs", explode('data.cf')).select('bs')

In [86]:
df_cf.printSchema()

root
 |-- bs: struct (nullable = true)
 |    |-- concept: string (nullable = true)
 |    |-- label: string (nullable = true)
 |    |-- unit: string (nullable = true)
 |    |-- value: string (nullable = true)



In [87]:
df_cf = df_cf.select('bs.concept', 'bs.label', 'bs.unit', 'bs.value').distinct()

In [88]:
df_cf.show()

+--------------------+--------------------+----+--------+
|             concept|               label|unit|   value|
+--------------------+--------------------+----+--------+
|       NetIncomeLoss|                    | usd|-6035394|
|NetCashProvidedBy...|Net cash used in ...| usd|-5636952|
|NetCashProvidedBy...|Net cash used in ...| usd|  -75047|
|ProceedsFromWarra...|                    | usd|     N/A|
|DepreciationAndAm...|                    | usd|    8468|
|ProceedsFromIssua...|                    | usd|     N/A|
|IncreaseDecreaseI...|Prepaid expenses ...| usd| -157226|
|     InterestPaidNet|                    | usd|    3246|
|PaymentsForRepurc...|Redemption of Ser...| usd|     N/A|
|PaymentsToAcquire...|Purchase of prope...| usd|   87047|
|IncreaseDecreaseI...|Accounts payable ...| usd|  146478|
|GainLossOnSaleOfP...|Gain on sale of p...| usd|   10964|
|ProceedsFromSaleO...|                    | usd|   12000|
|RepaymentsOfShort...|Payments on short...| usd|  181241|
|OGEN:StockDiv

### Income statement spark dataframe

In [89]:
df_ic = df.withColumn("ic", explode('data.ic')).select('ic')

In [90]:
df_ic.printSchema()

root
 |-- ic: struct (nullable = true)
 |    |-- concept: string (nullable = true)
 |    |-- label: string (nullable = true)
 |    |-- unit: string (nullable = true)
 |    |-- value: string (nullable = true)



In [91]:
df_ic = df_ic.select('ic.concept', 'ic.label', 'ic.unit', 'ic.value').distinct()

In [92]:
df_ic.show()

+--------------------+--------------------+----------+---------+
|             concept|               label|      unit|    value|
+--------------------+--------------------+----------+---------+
|OGEN:LocalBusines...|  Local business tax|       usd|      490|
|IncomeTaxExpenseB...|                    |       usd|      N/A|
|GeneralAndAdminis...|                    |       usd|  1331549|
| OperatingIncomeLoss|Loss from operations|       usd| -6054528|
|OtherNonoperating...|                    |       usd|    10964|
|EarningsPerShareB...|                    |usd/shares|    -0.05|
|WeightedAverageNu...|                    |    shares|116394806|
|RevenueFromContra...|                    |       usd|    15083|
|ResearchAndDevelo...|                    |       usd|  4738062|
|NonoperatingIncom...|Total other incom...|       usd|    19134|
|IncomeLossFromCon...|Loss before incom...|       usd| -6035394|
|InvestmentIncomeI...|                    |       usd|    11906|
|     InterestExpense|   

### General information about the company

In [93]:
df.printSchema()

root
 |-- data: struct (nullable = true)
 |    |-- bs: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- concept: string (nullable = true)
 |    |    |    |-- label: string (nullable = true)
 |    |    |    |-- unit: string (nullable = true)
 |    |    |    |-- value: string (nullable = true)
 |    |-- cf: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- concept: string (nullable = true)
 |    |    |    |-- label: string (nullable = true)
 |    |    |    |-- unit: string (nullable = true)
 |    |    |    |-- value: string (nullable = true)
 |    |-- ic: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- concept: string (nullable = true)
 |    |    |    |-- label: string (nullable = true)
 |    |    |    |-- unit: string (nullable = true)
 |    |    |    |-- value: string (nullable = true)
 |-- endDate: string (nullable = true)
 |-- quarter: string

In [94]:
df = df.withColumn('startDate', to_date(df.startDate, 'yyyy-MM-dd'))
df = df.withColumn('endDate',to_date(df.endDate, 'yyyy-MM-dd'))
df = df.withColumn('year', df.year.cast('int'))

In [95]:
year = df.select('year').collect()[0]#show()

In [96]:
df.show()

+--------------------+----------+-------+----------+------+----+
|                data|   endDate|quarter| startDate|symbol|year|
+--------------------+----------+-------+----------+------+----+
|{[{AssetsCurrent,...|2022-03-31|     Q1|2022-01-01|  OGEN|2022|
+--------------------+----------+-------+----------+------+----+



# Track one company over time

In [97]:
def get_json_in_quarter(dir_quarter):
    json_files = []
    json_files += [os.path.join(quarter_folder, file) for file in os.listdir(quarter_folder)]
    return json_files

In [98]:
def get_ticker_in_json_pyspark(json_file):
    try:
        df = spark.read.option("multiline", "true").json(json_file)
        return df.select("symbol").collect()[0][0]
    except NameError:
        print('Please initialize Spark session in variable called "spark"')

Let's get a random ticker so that one can see how 

In [99]:
def get_ticker_in_json(json_file):
    with open(json_file, 'r') as jf:
        df = json.load(jf)
    return df['symbol']

### Create database

In [100]:
import psycopg2
import configparser

In [101]:
config = configparser.ConfigParser()
config.read('config.ini')

['config.ini']

In [102]:
DB_NAME = config['DB']['name']
DB_USER = config['DB']['user']
DB_PASS = config['DB']['pass']
DB_HOST = config['DB']['host']
DB_PORT = config['DB']['port']

In [103]:
try:
    conn = psycopg2.connect(database = DB_NAME, user = DB_USER, 
                            password = DB_PASS, host = DB_HOST,
                            port = DB_PORT)
    print("Connection to database successful")
except:
    print("Connection to database failed")

Connection to database successful


In [104]:
cur = conn.cursor()

In [105]:
### Get lists to be implemented in the database

In [117]:
def get_year_qtr_of_dir(filedir):
    year = int(re.search('archive/(.*).QTR', filedir).group(1))
    qtr = int(re.search('QTR(.*)', filedir).group(1))
    return year, qtr

In [118]:
filedir = dir_list[0]
year, qtr = get_year_qtr_of_dir(filedir)

In [119]:
json_files = get_json_in_quarter(filedir)

In [120]:
tickers = []
for json_file in json_files[:100]:
    tickers.append(get_ticker_in_json(json_file))
tickers_id = list(enumerate(tickers))

In [137]:
type(tickers_id[0][1])

str

In [168]:
cur.execute("""
            DROP TABLE IF EXISTS summary_archive;
            CREATE TABLE IF NOT EXISTS summary_archive (
            year INT NOT NULL,
            quarter INT NOT NULL, 
            symbol VARCHAR ( 50 ) NOT NULL,
            id INT NOT NULL
            )
            """)

In [169]:
cur.execute("""
            INSERT INTO summary_archive (year, quarter, symbol, id)
            VALUES (%s, %s, %s, %s);
            """, 
            (year, qtr, tickers_id[0][1], tickers_id[0][0]))

In [170]:
cur.execute("SELECT * FROM summary_archive")
a = cur.fetchall()
a

[(2009, 2, 'OGEN', 0)]

In [171]:
conn.commit()

### BONUS: How to access a DB from pyspark 