In [None]:
import pyspark.sql.functions as f
from pyspark.sql import SparkSession

schema = ["sid", "id", "position", "created_at", "created_meta", "updated_at", "updated_meta", "meta", "year", "first_name", "county", "sex", "count"]
baby_names_path = 'Files/data/databricks_baby_names.json'


def read_json_and_flatten_data(spark, baby_names_path):
    """
    Reads the JSON data from the provided path and pulls all columns in the nested data column to top level.
    
    Parameters:
    - spark: The SparkSession object.
    - baby_names_path: Path to the JSON file containing baby names data.

    Returns:
    - A DataFrame with flattened data.
    """
    # Read the JSON file
    df = spark.read.json(baby_names_path, multiLine=True)
    
    # Extract the 'data' field which contains the records
    df_data = df.select(f.explode(df.data).alias('record'))
    
    # Flatten the nested structure using getItem to access array elements
    for i, col_name in enumerate(schema):
        df_data = df_data.withColumn(col_name, f.col('record').getItem(i))
    
    # Drop the original 'record' column
    df_data = df_data.drop('record')
    
    return df_data

def parse_dataframe_with_schema(df_processed, schema):
    """
    Parses the DataFrame returned by read_json_and_flatten_data for output to CSV based on the provided schema.
    
    Parameters:
    - df_processed: DataFrame returned from read_json_and_flatten_data.
    - schema: Schema to follow for the output CSV.

    Returns:
    - A DataFrame processed based on the provided schema.
    """
    # Select columns based on the provided schema
    df_final = df_processed.select(*schema)
    
    return df_final


df_processed = read_json_and_flatten_data(spark, baby_names_path)
df = parse_dataframe_with_schema(df_processed, schema)
display(df)

In [None]:
import pyspark.sql.functions as f
from pyspark.sql import SparkSession

# create a spark session
schema = ["sid", "id", "position", "created_at", "created_meta", "updated_at", "updated_meta", "meta", "year", "first_name", "county", "sex", "count"]
baby_names_path = 'Files/data/databricks_baby_names.json'

# Do not edit the code above this line.
########################################

def read_json_and_flatten_data(spark, baby_names_path):
    """
    Reads the JSON data from the provided path and pulls all columns in the nested data column to top level.
    
    Parameters:
    - spark: The SparkSession object.
    - baby_names_path: Path to the JSON file containing baby names data.

    Returns:
    - A DataFrame.
    """
    # Load JSON data
    df = spark.read.json(baby_names_path, multiLine=True)
    
    # Flatten the nested data
    df_flat = df.select(f.explode("data").alias("data")).selectExpr(
        "data[0] as sid",
        "data[1] as id",
        "data[2] as position",
        "data[3] as created_at",
        "data[4] as created_meta",
        "data[5] as updated_at",
        "data[6] as updated_meta",
        "data[7] as meta",
        "data[8] as year",
        "data[9] as first_name",
        "data[10] as county",
        "data[11] as sex",
        "data[12] as count"
    )
    return df_flat

def parse_dataframe_with_schema(df_processed, schema):
    """
    Parses the DataFrame returned by read_json_and_flatten_data for output to CSV based on the provided schema.
    
    Parameters:
    - df_processed: DataFrame returned from read_json_and_flatten_data.
    - schema: Schema to follow for the output CSV.

    Returns:
    - A DataFrame processed based on the provided schema.
    """
    # Select only the columns specified in the schema
    return df_processed.select(schema)

########################################
# Do not edit the code below this line
df_processed = read_json_and_flatten_data(spark, baby_names_path)
df = parse_dataframe_with_schema(df_processed, schema)
display(df)


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, explode, regexp_extract, when
import re

spark = SparkSession.builder.appName("databricks_de").getOrCreate()
visitors_path = 'Files/data/births-with-visitor-data.json'

def _run_query(query):
    return spark.sql(query).collect()

def _strip_margin(text):
    return re.sub('\n[ \t]*\|', '\n', text)

# Load the data
df = spark.read.json(visitors_path)

display(df)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, udf
from pyspark.sql.types import IntegerType, ArrayType, StructType, StringType, StructField
import xml.etree.ElementTree as ET
import re

spark = SparkSession.builder.appName("databricks_de").getOrCreate()
visitors_path = 'data/births-with-visitor-data.json'

def _run_query(query):
    return spark.sql(query).collect()

def _strip_margin(text):
    return re.sub('\n[ \t]*\|', '\n', text)

# Load JSON data
df = spark.read.json("Files/data/births-with-visitor-data.json")

# UDF to parse XML and extract visitor data
def parse_visitors(xml_string):
    root = ET.fromstring(xml_string)
    visitors = []
    for visitor in root.findall('visitor'):
        visitors.append({
            'visitor_id': int(visitor.get('id')),
            'age': int(visitor.get('age')),
            'sex': visitor.get('sex')
        })
    return visitors

# Register the UDF
parse_visitors_udf = udf(parse_visitors, ArrayType(StructType([
    StructField("visitor_id", IntegerType()),
    StructField("age", IntegerType()),
    StructField("sex", StringType())
])))

df = df.withColumn("visitor_data", explode(parse_visitors_udf(col("visitors"))))
df = df.select("sid", "created_at", "first_name", "county", "sex", "name_count", "visitor_data.*")

df.createOrReplaceTempView("tempVisitors")

### Part A
queryA = """
    SELECT COUNT(sid) AS totalRecords
    FROM tempVisitors
"""
query_result = _run_query(queryA)

try:
    partA = f"records={query_result[0][0]}"
except IndexError:
    partA = ""

### Part B
queryB = """
    SELECT county, AVG(visitor_count) AS avgVisitors
    FROM (
        SELECT sid, county, COUNT(visitor_id) AS visitor_count
        FROM tempVisitors
        GROUP BY sid, county
    ) AS visitor_counts
    GROUP BY county
    ORDER BY avgVisitors DESC
    LIMIT 1
"""
query_result = _run_query(queryB)
try:
    partB = f"county={query_result[0][0]}, avgVisitors={query_result[0][1]}"
except IndexError:
    partB = ""

### Part C
queryC = """
    SELECT AVG(age) AS avgVisitorAge
    FROM tempVisitors
    WHERE county = 'KINGS'
"""
query_result = _run_query(queryC)
try:
    partC = f"avgVisitorAge={query_result[0][0]}"
except IndexError:
    partC = ""

### Part D
queryD = """
    SELECT age AS mostCommonBirthAge, COUNT(*) AS count
    FROM tempVisitors
    WHERE county = 'KINGS'
    GROUP BY age
    ORDER BY count DESC
    LIMIT 1
"""
query_result = _run_query(queryD)
try:
    partD = f"mostCommonBirthAge={query_result[0][0]}, count={query_result[0][1]}"
except IndexError:
    partD = ""

In [None]:
print(f"{partA}\n{partB}\n{partC}\n{partD}\n")
