In [29]:
from datetime import datetime
import os
import configparser
import pandas as pd
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc

In [2]:
def create_spark_session(aws_access_key=None, aws_secret_key=None):
    conf = SparkConf()
    conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.0')
    conf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider')
    if (aws_access_key is not None) & (aws_secret_key is not None): 
        conf.set('spark.hadoop.fs.s3a.access.key', aws_access_key)
        conf.set('spark.hadoop.fs.s3a.secret.key', aws_secret_key)
    
    spark = SparkSession \
        .builder \
        .config(conf=conf) \
        .getOrCreate()
    return spark

### Create Spark Session

In [3]:
spark = create_spark_session()

### Raw Data Frames

In [4]:
meta_path = './input_data/coin_meta_data.csv'
prices_path = './input_data/coin_price_data/'
gtrends_path = './input_data/google_trends_data/'


pd_raw_meta_df = pd.read_csv(meta_path)

raw_meta_df = spark.createDataFrame(pd_raw_meta_df.where(pd.notnull(pd_raw_meta_df), None))
raw_prices_df = spark.read.csv(prices_path, header=True)
raw_gtrends_df = spark.read.csv(gtrends_path, header=True)

raw_meta_shape = (raw_meta_df.count(), len(raw_meta_df.columns))
raw_prices_shape = (raw_prices_df.count(), len(raw_prices_df.columns))
raw_gtrends_shape = (raw_gtrends_df.count(), len(raw_gtrends_df.columns))

print(f'Raw Meta Data Schema - {raw_meta_shape}')
raw_meta_df.printSchema()
print(f'Raw Prices Schema - {raw_prices_shape}')
raw_prices_df.printSchema()
print(f'Raw Google Trends Schema - {raw_gtrends_shape}')
raw_gtrends_df.printSchema()

Raw Meta Data Schema - (52, 10)
root
 |-- id: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- name: string (nullable = true)
 |-- block_time_in_minutes: long (nullable = true)
 |-- hashing_algorithm: string (nullable = true)
 |-- genesis_date: string (nullable = true)
 |-- twitter_screen_name: string (nullable = true)
 |-- subreddit_url: string (nullable = true)
 |-- description: string (nullable = true)
 |-- github_url: string (nullable = true)

Raw Prices Schema - (786669, 5)
root
 |-- date: string (nullable = true)
 |-- price_usd: string (nullable = true)
 |-- mcap_usd: string (nullable = true)
 |-- volume_usd: string (nullable = true)
 |-- coin_id: string (nullable = true)

Raw Google Trends Schema - (194080, 5)
root
 |-- _c0: string (nullable = true)
 |-- date: string (nullable = true)
 |-- keyword_interest: string (nullable = true)
 |-- keyword: string (nullable = true)
 |-- coin_id: string (nullable = true)



## Final Data Model

In [9]:
coin_metrics_path = './output_data/coin_metrics'
coins_path = './output_data/coins'
google_trends_path = './output_data/google_trends'
time_path = './output_data/time'

coin_metrics_df = spark.read.parquet(coin_metrics_path, header=True)
coins_df = spark.read.parquet(coins_path, header=True)
google_trends_df = spark.read.parquet(google_trends_path, header=True)
time_df = spark.read.parquet(time_path, header=True)

coin_metrics_shape = (coin_metrics_df.count(), len(coin_metrics_df.columns))
coins_shape = (coins_path_df.count(), len(coins_path_df.columns))
google_trends_shape = (google_trends_df.count(), len(google_trends_df.columns))
time_shape = (time_df.count(), len(time_df.columns))

print(f'Coin Metrics Schema - {coin_metrics_shape}')
coin_metrics_df.printSchema()
print(f'Google Trends Schema - {google_trends_shape}')
google_trends_df.printSchema()
print(f'Coins Schema - {coins_shape}')
coins_path_df.printSchema()
print(f'Time Schema - {time_shape}')
time_df.printSchema()

Coin Metrics Schema - (786669, 6)
root
 |-- coin_id: string (nullable = true)
 |-- recorded_at: timestamp (nullable = true)
 |-- currency: string (nullable = true)
 |-- price: double (nullable = true)
 |-- market_cap: double (nullable = true)
 |-- volume: double (nullable = true)

Google Trends Schema - (193472, 4)
root
 |-- coin_id: string (nullable = true)
 |-- recorded_at: timestamp (nullable = true)
 |-- keyword: string (nullable = true)
 |-- trend_value: integer (nullable = true)

Coins Schema - (52, 7)
root
 |-- coin_id: string (nullable = true)
 |-- ticker: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- twitter_account: string (nullable = true)
 |-- subreddit_url: string (nullable = true)
 |-- github_url: string (nullable = true)

Time Schema - (791016, 7)
root
 |-- recorded_at: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-

## Final Model Tables

In [6]:
coin_metrics_df.show()

+-----------+--------------------+--------+------------------+--------------------+--------------------+
|    coin_id|         recorded_at|currency|             price|          market_cap|              volume|
+-----------+--------------------+--------+------------------+--------------------+--------------------+
|binancecoin|2018-06-19 23:22:...|     usd|16.542910930091654|1.8865749046342008E9| 7.170711267473339E7|
|binancecoin|2018-06-30 02:45:...|     usd|14.660313618183396|1.6718810784275064E9|  6.46723625395231E7|
|binancecoin|2018-07-06 05:20:...|     usd|13.579914721855094| 1.548670994457347E9| 3.634828318429303E7|
|binancecoin|2018-07-09 12:13:...|     usd|13.556972923067582|1.5460546821261861E9| 5.151600845765645E7|
|binancecoin|2018-07-12 00:11:...|     usd|12.503530626770281| 1.425918763600528E9| 4.647439893762504E7|
|binancecoin|2018-07-21 10:00:...|     usd|11.722658054298263|1.1493045191559227E9|3.5216468677121826E7|
|binancecoin|2018-07-29 09:57:...|     usd|14.366882095

In [7]:
google_trends_df.show()

+----------------+-------------------+---------+-----------+
|         coin_id|        recorded_at|  keyword|trend_value|
+----------------+-------------------+---------+-----------+
|crypto-com-chain|2021-06-05 07:00:00|cro token|         30|
|crypto-com-chain|2021-06-06 18:00:00|cro token|         23|
|crypto-com-chain|2021-06-21 18:00:00|cro token|         26|
|crypto-com-chain|2021-06-29 22:00:00|cro token|         33|
|crypto-com-chain|2021-07-07 01:00:00|cro token|         24|
|crypto-com-chain|2021-07-12 14:00:00|cro token|         15|
|crypto-com-chain|2021-07-26 17:00:00|cro token|         24|
|crypto-com-chain|2021-10-02 12:00:00|cro token|          0|
|crypto-com-chain|2021-10-07 18:00:00|cro token|         31|
|crypto-com-chain|2021-10-09 05:00:00|cro token|         61|
|crypto-com-chain|2021-11-01 09:00:00|cro token|         11|
|crypto-com-chain|2021-11-15 01:00:00|cro token|         84|
|crypto-com-chain|2021-11-20 12:00:00|cro token|         22|
|crypto-com-chain|2021-1

In [10]:
coins_df.show()

+----------------+------+---------------+--------------------+---------------+--------------------+--------------------+
|         coin_id|ticker|           name|         description|twitter_account|       subreddit_url|          github_url|
+----------------+------+---------------+--------------------+---------------+--------------------+--------------------+
|        algorand|  algo|       Algorand|Algorand is a sca...|       algorand|https://www.reddi...|https://github.co...|
|       ftx-token|   ftt|      FTX Token|FTT is FTX's exch...|   FTX_official|                null|                null|
|            tron|   trx|           TRON|What is Tron?\r\n...| tronfoundation|https://www.reddi...|https://github.co...|
|             okb|   okb|            OKB|<a href="https://...|           OKEx|                null|                null|
|    bitcoin-cash|   bch|   Bitcoin Cash|Bitcoin Cash is a...|           null|https://www.reddi...|https://github.co...|
|          fantom|   ftm|       

In [11]:
time_df.show()

+--------------------+----+---+----+-----+----+-------+
|         recorded_at|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2020-04-15 05:24:...|   5| 15|  16|    4|2020|      4|
|2020-09-04 16:02:...|  16|  4|  36|    9|2020|      6|
|2018-08-08 11:34:...|  11|  8|  32|    8|2018|      4|
|2018-08-19 05:50:...|   5| 19|  33|    8|2018|      1|
|2021-09-15 21:05:...|  21| 15|  37|    9|2021|      4|
|2020-02-13 05:09:...|   5| 13|   7|    2|2020|      5|
|2022-01-30 14:01:...|  14| 30|   4|    1|2022|      1|
|2018-10-18 20:13:...|  20| 18|  42|   10|2018|      5|
|2019-02-27 08:00:...|   8| 27|   9|    2|2019|      4|
|2019-10-19 01:02:...|   1| 19|  42|   10|2019|      7|
|2021-05-09 06:02:...|   6|  9|  18|    5|2021|      1|
|2021-07-17 13:02:...|  13| 17|  28|    7|2021|      7|
|2018-10-02 06:54:...|   6|  2|  40|   10|2018|      3|
|2018-12-09 01:02:...|   1|  9|  49|   12|2018|      1|
|2018-09-12 16:26:...|  16| 12|  37|    9|2018| 

## Example Query
- below I will show an example of how we can get scores and prices per hour for a specific keyword type

In [26]:
token_keywords = google_trends_df.where(col('keyword').contains('token'))\
        .join(time_df, ['recorded_at'], 'inner')

out_df = metric_token_score_df = coin_metrics_df.join(time_df, coin_metrics_df.recorded_at == time_df.recorded_at, 'inner')\
        .join(token_keywords, (time_df.year == token_keywords.year)&(time_df.month == token_keywords.month)&(time_df.day == token_keywords.day) & (coin_metrics_df.coin_id == token_keywords.coin_id))\
        .join(coins_df, coin_metrics_df.coin_id == coins_df.coin_id, 'inner').select(coins_df.coin_id, 'ticker', 'name', 'keyword', token_keywords.recorded_at, 'trend_value', 'currency','price','market_cap', 'volume')

In [34]:
out_df.orderBy('recorded_at').show(100)

+----------------+------+---------------+----------+-------------------+-----------+--------+--------------------+--------------------+--------------------+
|         coin_id|ticker|           name|   keyword|        recorded_at|trend_value|currency|               price|          market_cap|              volume|
+----------------+------+---------------+----------+-------------------+-----------+--------+--------------------+--------------------+--------------------+
|              ki|   xki|             KI| xki token|2021-06-01 00:00:00|          0|     usd| 0.20836257659667973|                 0.0|   264324.5248757013|
|     persistence|  xprt|    Persistence|xprt token|2021-06-01 00:00:00|          0|     usd|  11.400455719533895|2.1090110593557045E8|  1221140.5283082407|
|        terrausd|   ust|       TerraUSD| ust token|2021-06-01 00:00:00|         69|     usd|  0.9993063246858573|1.9461810255603673E9| 5.579141266727159E7|
|crypto-com-chain|   cro|Crypto.com Coin| cro token|2021-0