In [56]:
import findspark, os
findspark.init('/home/hadoop/spark')

# Enable horizontal scroll
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import col, expr, explode, concat_ws

sc.stop()

# Initialize Spark Context
sc = pyspark.SparkContext()
spark = SparkSession(sc)

# Read raw cards from HDFS
mtg_cards_df = spark.read.format('json')\
    .options(nullValue='')\
    .load(f'/user/hadoop/mtg/raw/cards_2021-04-02.json')
mtg_cards_df.printSchema()
mtg_cards_df.show()

# Explode the array into single elements
mtg_cards_exploded_df = mtg_cards_df.select(explode('cards').alias('exploded'))\
    .select('exploded.*')
mtg_cards_exploded_df.printSchema()
mtg_cards_exploded_df.show()

# Replace all null values with empty strings
mtg_cards_renamed_null_df = mtg_cards_exploded_df\
    .na.fill('')
mtg_cards_renamed_null_df.printSchema()
mtg_cards_renamed_null_df.show()

columns = ['name', 'subtypes', 'text', 'flavor', 'artist']
reduced_cards = mtg_cards_renamed_null_df.select(*columns)
reduced_cards.printSchema()
reduced_cards.show()

flattened_subtypes = reduced_cards.withColumn('subtypes', concat_ws(', ', 'subtypes'))
flattened_subtypes.show()

#mtg_cards_raw_dataframe.select(col('cards').alias('cards'))\
#    .withColumn('cards', expr('transform(cards, card -> array(card.*))'))\
#    .select(col('cards').alias('result_cards'))\
#    .show()

#mtg_cards_raw_dataframe.select(*columns).show()

# Write data to HDFS
flattened_subtypes.write.format('json')\
    .mode('overwrite')\
    .save('/user/hadoop/mtg/final')

root
 |-- cards: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- artist: string (nullable = true)
 |    |    |-- cmc: double (nullable = true)
 |    |    |-- colorIdentity: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- colors: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- flavor: string (nullable = true)
 |    |    |-- foreignNames: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- flavor: string (nullable = true)
 |    |    |    |    |-- imageUrl: string (nullable = true)
 |    |    |    |    |-- language: string (nullable = true)
 |    |    |    |    |-- multiverseid: long (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |-- text: string (nullable = true)
 |    |    |    |    |-- type: string (nullable = true)
 |    |    |-- id: string (nullabl