In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.datastax.spark:spark-cassandra-connector_2.12:3.4.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1 --conf spark.cassandra.connection.host=192.168.112.4 pyspark-shell'

In [None]:
import json
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, FloatType, ArrayType
from pyspark.sql.functions import col, explode
from pyspark import SparkConf, SparkContext

spark = SparkSession.builder.master('local[12]').appName("btc_insertion").getOrCreate()

In [None]:
input_schema = StructType([
        StructField("ot", ArrayType(FloatType())),
        StructField("ct", ArrayType(FloatType())),
        StructField("o", ArrayType(FloatType())),
        StructField("h", ArrayType(FloatType())),
        StructField("l", ArrayType(FloatType())),
        StructField("c", ArrayType(FloatType())),
        StructField("v", ArrayType(FloatType()))
])

data = json.load(open('../data/dataset.json', 'r'))

In [None]:
conf = SparkConf().setAppName("df_cassandra")
spark = SparkSession.builder.config(conf=conf).getOrCreate()

batch_size = 500
current_index = 0

for interval in data:
    data_i = data[interval]

    while len(data_i['ot'][current_index:batch_size]) > 0:
    
        df = spark.createDataFrame([Row(
            ot = data_i['ot'][current_index:batch_size],
            ct = data_i['ct'][current_index:batch_size],
            o = data_i['o'][current_index:batch_size],
            h = data_i['h'][current_index:batch_size],
            l = data_i['l'][current_index:batch_size],
            c = data_i['c'][current_index:batch_size],
            v = data_i['v'][current_index:batch_size]
        )], schema=input_schema)

        current_index = data_i['ot'].index(data_i['ot'][current_index:batch_size][-1])
    
        df = df\
            .withColumn('ot', explode(df['ot']))\
            .withColumn('ct', explode(df['ct']))\
            .withColumn('o', explode(df['o']))\
            .withColumn('h', explode(df['h']))\
            .withColumn('l', explode(df['l']))\
            .withColumn('c', explode(df['c']))\
            .withColumn('v', explode(df['v']))
    
        df = df.withColumn('ot', col('ot').cast("Timestamp")).withColumn('ct', col('ct').cast("Timestamp"))
    
        df.write\
            .format("org.apache.spark.sql.cassandra")\
            .options(table = f'raw_{interval}', keyspace = 'bitcoin_chart')\
            .mode('append')\
            .save()
        
        del df
        del data_i
    