In [None]:
import findspark
import os
import pandas as pd
import psycopg2
import requests
import sys


from datetime import datetime
findspark.init()
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

#Include base directory
notebook_dir = Path(os.getcwd())
sys.path.append((notebook_dir.parent).as_posix())

from database_functions import (
    get_config,
    insert_values, 
    general_query)

In [None]:
#GLOBALS
TABLE_NAME_TARGET='crypto_timeseries'

CONFIG_DB = get_config(filename="config.ini", section="crypto")
CONFIG_API = get_config(filename="config.ini", section="api")
CONFIG_SPARK= get_config(filename="config.ini", section="apache-spark")

SPARK_SESSION_NAME="main_etl_pyspark"
COLUMN_DATATYPES = {
            "timestamp":            "timestamp",
            "id":                   "string",
            "rank":                 "int",
            "symbol":               "string",
            "name":                 "string",
            "supply":               "double",
            "maxsupply":            "double",
            "marketcapusd":         "double",
            "volumeusd24hr":        "double",
            "priceusd":             "double",
            "changepercent24hr":    "double",
            "vwap24hr":             "double",
            "explorer":             "string"
            }


In [None]:
#initiate spark session
print(f"\nInit Spark session..")
spark = SparkSession \
    .builder \
    .appName(SPARK_SESSION_NAME) \
    .config("spark.jars", CONFIG_SPARK['postresql_jar']) \
    .getOrCreate()

print(f"\nSpark is running at: \n{spark._jsc.sc().uiWebUrl().get()}")

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 53830)
Traceback (most recent call last):
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/socketserver.py", line 747, in __init__
    self.handle()
  File "/opt/homebrew/Cellar/apache-spark/3.5.4/libex

In [None]:
##----> ETRACT <----- ##
print(f"\nEXTRACT from API query..")
response = requests.get(url=CONFIG_API['url'].format(api_key=CONFIG_API['api_key']))

In [None]:
##----> TRANSFORM <----- ##
print(f"\nTRANSFORM..")
#api json response to pandas Dataframe
responseData=response.json()
df = pd.json_normalize(data=responseData, record_path='data')

#insert timestamp
current_timestamp = datetime.now()
current_timestamp.strftime('%d-%m-%Y %H:%M:%S')
df['timestamp'] = [current_timestamp]*df.shape[0]

#rename columns to lowercase
rename_cols_dict={c:c.lower() for c in df.columns.tolist()}
df.rename(columns=rename_cols_dict, inplace=True)

#drop columns with tokens*
df = df.loc[:, ~df.columns.str.startswith("tokens.")]

#pandas Dataframe --> spark DataFrame
dfs = spark.createDataFrame(df)

#assure expected columns data types
for column_name, data_type in COLUMN_DATATYPES.items():
    dfs = dfs\
        .withColumn(column_name, col(column_name).cast(data_type))

In [None]:
##----> LOAD <----- ##
print(f"\nLOAD..")
#connect Spark to to postgreSQL database
url_db = f"jdbc:postgresql://{CONFIG_DB['host']}:{CONFIG_DB['port']}/{CONFIG_DB['database']}"
properties_dbspark = {
        "user":     f"{CONFIG_DB['user']}",
        "password": f"{CONFIG_DB['password']}",
        "driver":   "org.postgresql.Driver"
    }

#load PySpark DataFrame to postresql database
dfs.write.jdbc(
                url         =   url_db, 
                table       =   TABLE_NAME_TARGET, 
                mode        =   "append", 
                properties  =   properties_dbspark
            )

25/07/30 12:16:32 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1025817 ms exceeds timeout 120000 ms
25/07/30 12:16:32 WARN SparkContext: Killing executors is not supported by current scheduler.
25/07/30 12:16:36 INFO Executor: Told to re-register on heartbeat
25/07/30 12:16:36 INFO BlockManager: BlockManager BlockManagerId(driver, 10.74.70.95, 53805, None) re-registering with master
25/07/30 12:16:36 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.74.70.95, 53805, None)
25/07/30 12:16:36 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv