#Inserção no Banco de Dados Cassandra

Importações necessárias para realizar esse código:

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

Inicialização da sessão do Spark e conexão com o Cassandra via a API **JDBC**:

In [None]:
spark = SparkSession\
    .builder\
    .appName("Spark Exploration App")\
    .config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.1.0")\
    .config("spark.sql.extensions","com.datastax.spark.connector.CassandraSparkExtensions") \
    .config("spark.cassandra.connection.host","34.151.229.216") \
    .config("spark.cassandra.connection.port","9042") \
    .getOrCreate()
    

In [None]:
keyspace = "desafio_final"

Método criado para carregar os dados presentes em uma determinada tabela de um Keyspace:

In [None]:
def loadData(table):
    df = spark.read \
        .format("org.apache.spark.sql.cassandra") \
        .option("keyspace", keyspace) \
        .option("table", table) \
        .load()
    return df

Método criado para inserir os dados de um DataFrame em uma determinada tabela e Keyspace:

In [None]:
def saveData(df, table):
    df.write \
        .format("org.apache.spark.sql.cassandra") \
        .option("keyspace", keyspace) \
        .option("table", table) \
        .mode('append') \
        .save()

Nessa parte do código estamos lendo os parquet cridos na etapa anterior e inseridos no Cloud Storage e transformandos os em DataFrames.

In [None]:
pib_agricola = spark.read.parquet('parquets/parquet_pib_agricola')
usa_usda = spark.read.parquet('parquets/parquet_usa_agricultura')
valor_producao = spark.read.parquet('parquets/parquet_valor_producao')
qnt_colheita = spark.read.parquet('parquets/quantidade_colheita')
exportacao_total = spark.read.parquet('parquets/total_exportacao')
exportacao_pais = spark.read.parquet('parquets/exportacao_pais')
exportacao_estado = spark.read.parquet('parquets/exportacao_estado')
exportacao_produto = spark.read.parquet('parquets/exportacao_produto')

Aqui estamos verificando se os DataFrames gerados com as leituras dos parquet estão corretos.

In [None]:
pib_agricola.printSchema()
usa_usda.printSchema()
valor_producao.printSchema()
qnt_colheita.printSchema()
qnt_colheita.count()
exportacao_total.printSchema()
exportacao_pais.printSchema()
exportacao_estado.printSchema() 
exportacao_produto.printSchema()

Nessa parte estamos utilizando o método saveData para inserir o conteúdo de cada DataFrame nas respectivas tabelas do BD Cassandra.

In [None]:
saveData(pib_agricola,'pib_agricola')
saveData(usa_usda,'usa_agricultura')
saveData(valor_producao,'valor_producao')
saveData(qnt_colheita,'quantidade_colheita')
saveData(exportacao_total,'total_exportacao')
saveData(exportacao_pais,'exportacao_pais')
saveData(exportacao_estado,'exportacao_estado')
saveData(exportacao_produto,'exportacao_produto')