In [8]:
# Import Libraries
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .appName("read-csv-data")
         .master("local[*]")
         .config("spark.executor.memory", "2g")
         .getOrCreate())

spark.sparkContext.setLogLevel("ERROR")

In [9]:
# Read CSV data with inferred schema
df = (spark.read.format("csv")
      .option("header", "true")
      .load("data/netflix_titles.csv"))

In [10]:
# Display data content in dataframe
df.show()

+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+
|show_id|   type|               title|            director|                cast|             country|        date_added|release_year|rating| duration|           listed_in|         description|
+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+
|     s1|  Movie|Dick Johnson Is Dead|     Kirsten Johnson|                null|       United States|September 25, 2021|        2020| PG-13|   90 min|       Documentaries|As her father nea...|
|     s2|TV Show|       Blood & Water|                null|Ama Qamata, Khosi...|        South Africa|September 24, 2021|        2021| TV-MA|2 Seasons|International TV ...|After crossing pa...|
|     s3|TV Show|           Ganglan

In [11]:
# Read CSV data with an explicit shecma
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DataType

schema = StructType([
    StructField("show_id", StringType(), True),
    StructField("type", StringType(), True),
    StructField("title", StringType(), True), 
    StructField("director", StringType(), True),
    StructField("cast", StringType(), True),
    StructField("country", StringType(), True),
    StructField("date_added", StringType(), True),
    StructField("release_year", IntegerType(), True),
    StructField("rating", StringType(), True),
    StructField("duration", StringType(), True),
    StructField("listed_in", StringType(), True),
    StructField("description", StringType(), True)
])

In [18]:
# Read CSSV data using spark
df_csv = (spark.read.format("csv")
      .option("header", "true")
      .option("nullValue", "NA")
      .option("escapeQuotes", "true")
      .option("dateFormat", "MMMM d, yyyy")
      .schema(schema)
      .load("data/netflix_titles.csv"))

In [19]:
# Display the first 5 rows of the dataframe
df.show()

+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+
|show_id|   type|               title|            director|                cast|             country|        date_added|release_year|rating| duration|           listed_in|         description|
+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+
|     s1|  Movie|Dick Johnson Is Dead|     Kirsten Johnson|                null|       United States|September 25, 2021|        2020| PG-13|   90 min|       Documentaries|As her father nea...|
|     s2|TV Show|       Blood & Water|                null|Ama Qamata, Khosi...|        South Africa|September 24, 2021|        2021| TV-MA|2 Seasons|International TV ...|After crossing pa...|
|     s3|TV Show|           Ganglan

In [None]:
# Stop spark session
'''spark.stop()'''

In [14]:
# Reading JSON data with spark
# Load JSON data into spark dataframe
df_json = (spark.read.format("json")
           .option("multiline", "true")
           .load("data/nobel_prizes.json"))

In [15]:
# View df_json datafram schema
df_json.printSchema()

root
 |-- category: string (nullable = true)
 |-- laureates: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- firstname: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- motivation: string (nullable = true)
 |    |    |-- share: string (nullable = true)
 |    |    |-- surname: string (nullable = true)
 |-- overallMotivation: string (nullable = true)
 |-- year: string (nullable = true)



In [27]:
# Flatten nested JSON structure
#from pyspark.sql.functions import col, explode

df_flattend = (df_json
               .withColumn("laureates", explode(col("laureates")))
               .select(
                       col("category"),
                       col("year"),
                       col("overallMotivation"), 
                       col("laureates.id"),
                       col("laureates.firstname"),
                       col("laureates.surname"),
                       col("laureates.share"),
                       col("laureates.motivation")
                       )
                )


In [28]:
# Display flattened dataframe
df_flattend.show(truncate=False)

+----------+----+-----------------+----+--------------------------+-----------+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|category  |year|overallMotivation|id  |firstname                 |surname    |share|motivation                                                                                                                                                                                                                                                                                                                                                                              |
+----------+----+-----------------+----+--------------------------+-------

In [35]:
# Use a schema to enforce data types for reading data
from pyspark.sql.types import ArrayType
json_schema = StructType(
    [
        StructField("category", StringType(), True),
        StructField("laureates", ArrayType(StructType(
            [StructField("firstname", StringType(), True),
             StructField("id", StringType(), True),
             StructField("motivation", StringType(), True),
             StructField("share", StringType(), True),
             StructField("surname", StringType(), True)]
        ), True), True),
        StructField("overallMotivation", StringType(), True),
        StructField("year", IntegerType(), True)
    ]
)

In [40]:
json_df_with_schema = (
    spark.read.format("json")
    .schema(json_schema)
    .option("multiLine", "true")
    .option("mode", "PERMISSIVE")
    .option("columnNameOfCorruptRecord", "corrupt_record")
    .load("data/nobel_prizes.json")
)

In [46]:
# Get json object and json tuple functions
from pyspark.sql.functions import get_json_object

# Dataframe with a json striing column
df_object = spark.createDataFrame([
    (1, '{"name": "Alice", "age":25}'),
    (2, '{"name": "Bob", "age": 30}')
], ["id", "json_data"])

# Extract the "name" field from the json string column
name_df = df_object.select(get_json_object("json_data", "$.name").alias("name"))

In [47]:
# Cast extracted value to string
name_str_df = name_df.withColumn("name_str", name_df["name"].cast(StringType()))

# Display the extracted dataframe 
name_str_df.show()

+-----+--------+
| name|name_str|
+-----+--------+
|Alice|   Alice|
|  Bob|     Bob|
+-----+--------+



In [48]:
from pyspark.sql.functions import json_tuple

# Dataframe with json string column
df_tuple = spark.createDataFrame([
    (1, '{"name": "Alice", "age": 25}'),
    (2, '{"name": "Bob", "age": 30}')
], ["id", "json_data"])

# Extract name and age fields from json string column
name_age_df = df_tuple.select(json_tuple("json_data", "name", "age").alias("name", "age"))

# Display the extracted dataframe
name_age_df.show()

+-----+---+
| name|age|
+-----+---+
|Alice| 25|
|  Bob| 30|
+-----+---+



In [None]:
# Import nested dataframe
from pyspark.sql.functions import *
spark = (SparkSession.builder
         .appName("nested_dataframe")
         .getOrCreate())

In [52]:
# Load the data
df_nested = (spark.read.format("json")
             .option("multiLine", "true")
             .load("data/Stanford Question Answering Dataset.json"))

In [53]:
# Explode nested data
df_exploded = (
    df_nested.select("title", explode("paragraphs").alias("paragraphs"))
    .select("title", col("paragraphs.context").alias("context"),
            explode(col("paragraphs.qas")).alias("questions"))
)

# Display exploded dataframe
df_exploded.show()

+-------------+--------------------+--------------------+
|        title|             context|           questions|
+-------------+--------------------+--------------------+
|Super_Bowl_50|Super Bowl 50 was...|{[{177, Denver Br...|
|Super_Bowl_50|Super Bowl 50 was...|{[{249, Carolina ...|
|Super_Bowl_50|Super Bowl 50 was...|{[{403, Santa Cla...|
|Super_Bowl_50|Super Bowl 50 was...|{[{177, Denver Br...|
|Super_Bowl_50|Super Bowl 50 was...|{[{488, gold}, {4...|
|Super_Bowl_50|Super Bowl 50 was...|{[{487, "golden a...|
|Super_Bowl_50|Super Bowl 50 was...|{[{334, February ...|
|Super_Bowl_50|Super Bowl 50 was...|{[{133, American ...|
|Super_Bowl_50|Super Bowl 50 was...|{[{487, "golden a...|
|Super_Bowl_50|Super Bowl 50 was...|{[{133, American ...|
|Super_Bowl_50|Super Bowl 50 was...|{[{334, February ...|
|Super_Bowl_50|Super Bowl 50 was...|{[{177, Denver Br...|
|Super_Bowl_50|Super Bowl 50 was...|{[{355, Levi's St...|
|Super_Bowl_50|Super Bowl 50 was...|{[{403, Santa Cla...|
|Super_Bowl_50

In [55]:
# Get unique values with array
df_array_distinct = (
    df_exploded.select("title", "context", 
                       col("questions.id").alias("question_id"), 
                       col("questions.question").alias("question_text"),
                       array_distinct("questions.answers").alias("answers"))
)

# Display distinct array dataframe
df_array_distinct.show()

+-------------+--------------------+--------------------+--------------------+--------------------+
|        title|             context|         question_id|       question_text|             answers|
+-------------+--------------------+--------------------+--------------------+--------------------+
|Super_Bowl_50|Super Bowl 50 was...|56be4db0acb800140...|Which NFL team re...|[{177, Denver Bro...|
|Super_Bowl_50|Super Bowl 50 was...|56be4db0acb800140...|Which NFL team re...|[{249, Carolina P...|
|Super_Bowl_50|Super Bowl 50 was...|56be4db0acb800140...|Where did Super B...|[{403, Santa Clar...|
|Super_Bowl_50|Super Bowl 50 was...|56be4db0acb800140...|Which NFL team wo...|[{177, Denver Bro...|
|Super_Bowl_50|Super Bowl 50 was...|56be4db0acb800140...|What color was us...|[{488, gold}, {52...|
|Super_Bowl_50|Super Bowl 50 was...|56be8e613aeaaa140...|What was the them...|[{487, "golden an...|
|Super_Bowl_50|Super Bowl 50 was...|56be8e613aeaaa140...|What day was the ...|[{334, February 7...|


In [56]:
# Array contains function
from pyspark.sql.functions import array_contains

df_array_contains = spark.createDataFrame(
    [(["apple", "orange", "banana"],),
     (["grape", "kiwi", "melon"],),
     (["pear", "apple", "pineapple"],)],
    ["fruits"]
)

# Display array contains dataframe
(df_array_contains.select("fruits", array_contains("fruits", "apple").alias("contains_apple")).show(truncate=False))

+------------------------+--------------+
|fruits                  |contains_apple|
+------------------------+--------------+
|[apple, orange, banana] |true          |
|[grape, kiwi, melon]    |false         |
|[pear, apple, pineapple]|true          |
+------------------------+--------------+



In [58]:
# Explode outer function
data = [
    {"words": ["hello", "world"]},
    {"words": ["foo", "bar", "baz"]},
    {"words": None}
]

df_outer = spark.createDataFrame(data)

# Display outer explode dataframe
(df_outer.select(explode_outer("words").alias("waord")).show(truncate=False))

+-----+
|waord|
+-----+
|hello|
|world|
|foo  |
|bar  |
|baz  |
|null |
+-----+



In [59]:
# import text processing
from pyspark.sql.functions import *
spark = (SparkSession.builder
         .appName("text-processing")
         .getOrCreate())

In [60]:
# Load the data
df_text = (spark.read.format("csv")
           .option("header", "true")
           .option("multiLine", "true")
           .load("data/Reviews.csv"))

In [61]:
# Dsplay text dataframe
df_text.show(10, truncate=False)

+------+----------+--------------+----------------------------------+--------------------+----------------------+-----+----------+---------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [62]:
# Use regular expression to remove all non-alphabetic charaters
df_clean = (df_text.withColumn("Text", regexp_replace("Text", "[^a-zA-Z]", ""))
            .withColumn("Text", regexp_replace("Text", " +", " ")))

# Display clean dataframe
df_clean.show()

+------+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
|    Id| ProductId|        UserId|         ProfileName|HelpfulnessNumerator|HelpfulnessDenominator|Score|      Time|             Summary|                Text|
+------+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
|250093|B0029NII3C|A3P8CU9874SRK5|        C. christine|                   0|                     0|    2|1316649600|Unwanted Ingredients|Iwasinterestedint...|
|250115|B0013MEB40| A99TG4Q2ZPW7S|"Blu-estLight ""M...|                   0|                     0|    4|1310083200|one of my favorit...|Thesecookiesarebo...|
|250132|B005UBH8WC| AY12DBB0U420B|       Gary Peterson|                   0|                     0|    5|1336348800|A Favorite of My ...|IvebeenusingIcebr...|
|250137|B001EQ57QG| A9X62UCTFNQBE|            