In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, LongType, ArrayType, MapType
from pyspark.sql.functions import col

In [2]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Handle Corrupt Records with Save") \
    .config("spark.driver.memory", "6g") \
    .config("spark.executor.memory", "12g") \
    .config("spark.executor.cores", "10") \
    .config("spark.sql.shuffle.partitions", "1000") \
    .config("spark.network.timeout", "600s") \
    .getOrCreate()

In [3]:
# Define the schema
schema = StructType([
    StructField("_index", StringType(), True),
    StructField("_type", StringType(), True),
    StructField("_id", StringType(), True),
    StructField("_score", FloatType(), True),
    StructField("_ignored", ArrayType(StringType()), True),
    StructField("_source", StructType([
        StructField("@timestamp", StringType(), True),
        StructField("document", StructType([
            StructField("runId", StringType(), True),
            StructField("runName", StringType(), True),
            StructField("utcTime", StringType(), True),
            StructField("trace", StructType([
                StructField("disk", StringType(), True),
                StructField("%cpu", FloatType(), True),
                StructField("wchar", LongType(), True),
                StructField("container", StringType(), True),
                StructField("rchar", LongType(), True),
                StructField("module", ArrayType(StringType()), True),
                StructField("process", StringType(), True),
                StructField("memory", LongType(), True),
                StructField("syscw", LongType(), True),
                StructField("rss", LongType(), True),
                StructField("inv_ctxt", LongType(), True),
                StructField("error_action", StringType(), True),
                StructField("native_id", StringType(), True),
                StructField("read_bytes", LongType(), True),
                StructField("attempt", LongType(), True),
                StructField("vol_ctxt", LongType(), True),
                StructField("peak_vmem", LongType(), True),
                StructField("peak_rss", LongType(), True),
                StructField("workdir", StringType(), True),
                StructField("queue", StringType(), True),
                StructField("env", StringType(), True),
                StructField("script", StringType(), True),
                StructField("name", StringType(), True),
                StructField("realtime", LongType(), True),
                StructField("submit", LongType(), True),
                StructField("status", StringType(), True),
                StructField("tag", StringType(), True),
                StructField("scratch", StringType(), True),
                StructField("cpus", LongType(), True),
                StructField("task_id", LongType(), True),
                StructField("hash", StringType(), True),
                StructField("%mem", FloatType(), True),
                StructField("exit", LongType(), True),
                StructField("duration", LongType(), True),
                StructField("time", LongType(), True),
                StructField("write_bytes", LongType(), True),
                StructField("start", LongType(), True),
                StructField("complete", LongType(), True),
                StructField("vmem", LongType(), True),
                StructField("syscr", LongType(), True)
            ]), True),
            StructField("event", StringType(), True)
        ]), True),
        StructField("url", StructType([
            StructField("path", StringType(), True),
            StructField("domain", StringType(), True)
        ]), True),
        StructField("host", StringType(), True),
        StructField("@version", StringType(), True),
        StructField("user_agent", StructType([
            StructField("original", StringType(), True)
        ]), True),
        StructField("http", StructType([
            StructField("version", StringType(), True),
            StructField("method", StringType(), True),
            StructField("request", StructType([
                StructField("body", StructType([
                    StructField("bytes", StringType(), True)
                ]), True),
                StructField("mime_type", StringType(), True)
            ]), True)
        ]), True),
        StructField("event", StructType([
            StructField("original", StringType(), True)
        ]), True)
    ]), True)
])

# Now you can use this schema to create DataFrames in PySpark


In [4]:
file_path = "/home/jovyan/data_science_challenge/DataEngineerTakeHomeChallenge/projects/datasets/elasticsearch_data-2024.json"
df = spark.read.option("multiline","true").json(file_path, schema=schema)

In [5]:
df.count()

505827

In [7]:
df.show() 

+--------------------+-----+--------------------+------+--------------------+--------------------+
|              _index|_type|                 _id|_score|            _ignored|             _source|
+--------------------+-----+--------------------+------+--------------------+--------------------+
|user-data-tol-it-...| _doc|SBHgUZIBbkWET0fvB-a_|   1.0|[document.trace.s...|{2024-10-03T10:14...|
|user-data-tol-it-...| _doc|SRHgUZIBbkWET0fvCOZZ|   1.0|[document.trace.s...|{2024-10-03T10:14...|
|user-data-tol-it-...| _doc|5BHgUZIBbkWET0fvGuZm|   1.0|[document.trace.s...|{2024-10-03T10:14...|
|user-data-tol-it-...| _doc|eOLaUZIBejYkdjOqYp8S|   1.0|[document.trace.s...|{2024-10-03T10:08...|
|user-data-tol-it-...| _doc|97HbUZIBYHA1T2qnTNBR|   1.0|[document.trace.s...|{2024-10-03T10:09...|
|user-data-tol-it-...| _doc|-LHbUZIBYHA1T2qnTdA_|   1.0|[document.trace.s...|{2024-10-03T10:09...|
|user-data-tol-it-...| _doc|BbHbUZIBYHA1T2qnX9Nj|   1.0|[document.trace.s...|{2024-10-03T10:09...|
|user-data

In [9]:
df.printSchema()

root
 |-- _index: string (nullable = true)
 |-- _type: string (nullable = true)
 |-- _id: string (nullable = true)
 |-- _score: float (nullable = true)
 |-- _ignored: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- _source: struct (nullable = true)
 |    |-- @timestamp: string (nullable = true)
 |    |-- document: struct (nullable = true)
 |    |    |-- runId: string (nullable = true)
 |    |    |-- runName: string (nullable = true)
 |    |    |-- utcTime: string (nullable = true)
 |    |    |-- trace: struct (nullable = true)
 |    |    |    |-- disk: string (nullable = true)
 |    |    |    |-- %cpu: float (nullable = true)
 |    |    |    |-- wchar: long (nullable = true)
 |    |    |    |-- container: string (nullable = true)
 |    |    |    |-- rchar: long (nullable = true)
 |    |    |    |-- module: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- process: string (nullable = true)
 |    |    |

In [8]:
df.write.parquet("/home/jovyan/data_science_challenge/DataEngineerTakeHomeChallenge/projects/datasets/elastic_search/elasticsearch_data-2024.parquet")