In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType
from pyspark.sql.functions import col, explode
import os
from datetime import datetime
import json

In [16]:
# Initialize a SparkSession
spark = SparkSession.builder \
    .appName("Read and Flatten Drivers Section with Schema") \
    .getOrCreate()

In [17]:
# Define the schema for the "options" array
options_schema = ArrayType(
    StructType([
        StructField("driver_number", IntegerType(), True)
    ])
)

# Define the schema for the "drivers" section
drivers_schema = StructType([
    StructField("url", StringType(), True),
    StructField("frequency", StringType(), True),
    StructField("cdc", StringType(), True),
    StructField("type", StringType(), True),
    StructField("options", options_schema, True)
])

# Define the schema for the entire JSON file
schema = StructType([
    StructField("drivers", drivers_schema, True),
    StructField("intervals", StructType([
        StructField("url", StringType(), True),
        StructField("frequency", StringType(), True),
        StructField("cdc", StringType(), True),
        StructField("type", StringType(), True)
    ]), True)
])

In [18]:
# Path to your local multiline JSON file
json_file_path = "/Users/ankitkhati/Learning/Projects/PyProjects/Project-Initial/configs/extractConfig.json"

# Read the JSON file with the schema and the multiline option enabled
df = spark.read.option("multiline", "true").schema(schema).json(json_file_path)

# Initialize parameter
extractSource = "drivers"

# Select the "drivers" section
drivers_df = df.select(col(f"{extractSource}.*"))

# Explode the "options" array to flatten it
flattened_drivers_df = drivers_df.withColumn("option", explode(col("options"))).drop("options")

# Select the desired columns, including the driver_number from the flattened options
result_df = flattened_drivers_df.select(
    col("url"),
    col("frequency"),
    col("cdc"),
    col("type"),
    col("option.driver_number").alias("driver_number")
).distinct()

url_list = result_df.rdd.map(lambda row: row.url).collect()

In [19]:
import requests

for url in url_list:
    response = requests.get(url)
    df_response_text = spark.sparkContext.parallelize([response.text])
    df_json = spark.read.json(df_response_text)
    #df_json.write.json()

24/08/06 21:18:12 WARN TaskSetManager: Stage 11 contains a task of very large size (1284 KiB). The maximum recommended task size is 1000 KiB.


In [20]:
current_date = datetime.now()
time_1 = current_date.timestamp()
timetz_1 = current_date.timetz()
timetuple_1 = current_date.timetuple()
print(time_1)
print(timetz_1)
print(timetuple_1)

1722993492.412452
21:18:12.412452
time.struct_time(tm_year=2024, tm_mon=8, tm_mday=6, tm_hour=21, tm_min=18, tm_sec=12, tm_wday=1, tm_yday=219, tm_isdst=-1)


In [21]:
# Get current date
current_date = datetime.now()
year = current_date.year
month = current_date.month
day = current_date.day
hour = current_date.hour
minute = current_date.minute


output_directory = f'/Users/ankitkhati/Learning/Projects/PyProjects/Project-Initial/data/{extractSource}/{year}/{month}/{day}/{hour}{minute}'




df_json.printSchema()
df_json.write.mode("overwrite").json(output_directory)

24/08/06 21:18:12 WARN TaskSetManager: Stage 12 contains a task of very large size (1284 KiB). The maximum recommended task size is 1000 KiB.


root
 |-- broadcast_name: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- driver_number: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- full_name: string (nullable = true)
 |-- headshot_url: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- meeting_key: long (nullable = true)
 |-- name_acronym: string (nullable = true)
 |-- session_key: long (nullable = true)
 |-- team_colour: string (nullable = true)
 |-- team_name: string (nullable = true)



In [22]:
print(output_directory)

# Split the path into components
path_components = output_directory.split('/')

# Extract the date and time components
date_time_components = path_components[-4:]  # Last 4 components are the date and time
date_time_str = '/'.join(date_time_components)

print(path_components)
print(date_time_components)
print(date_time_str)

/Users/ankitkhati/Learning/Projects/PyProjects/Project-Initial/data/drivers/2024/8/6/2118
['', 'Users', 'ankitkhati', 'Learning', 'Projects', 'PyProjects', 'Project-Initial', 'data', 'drivers', '2024', '8', '6', '2118']
['2024', '8', '6', '2118']
2024/8/6/2118


In [25]:
# Path to your local JSON file
json_file_path = "/Users/ankitkhati/Learning/Projects/PyProjects/Project-Initial/configs/extractConfig.json"

# Read the JSON file
with open(json_file_path, 'r') as file:
    config = json.load(file)

display(config)

{'drivers': {'url': 'https://api.openf1.org/v1/drivers',
  'frequency': 'Once',
  'cdc': 'Yes',
  'type': 'Master',
  'options': [{'driver_number': 1}, {'driver_number': 1}]},
 'intervals': {'url': 'https://api.openf1.org/v1/intervals',
  'frequency': 'Once',
  'cdc': 'Yes',
  'type': 'Master'}}

In [26]:
config[f"{extractSource}"]["offsetTimestamp"] = date_time_str
updated_json_str = json.dumps(config, indent=4)

print(updated_json_str)

{
    "drivers": {
        "url": "https://api.openf1.org/v1/drivers",
        "frequency": "Once",
        "cdc": "Yes",
        "type": "Master",
        "options": [
            {
                "driver_number": 1
            },
            {
                "driver_number": 1
            }
        ],
        "offsetTimestamp": "2024/8/6/2118"
    },
    "intervals": {
        "url": "https://api.openf1.org/v1/intervals",
        "frequency": "Once",
        "cdc": "Yes",
        "type": "Master"
    }
}


In [27]:
with open(json_file_path, "w") as output_file:
    output_file.write(updated_json_str)

24/08/07 05:22:17 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1002828 ms exceeds timeout 120000 ms
24/08/07 05:22:17 WARN SparkContext: Killing executors is not supported by current scheduler.
24/08/07 05:38:27 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$

In [23]:

# Show the contents of the result DataFrame
result_df.show(truncate=False)

# Print the schema of the result DataFrame
result_df.printSchema()

+---------------------------------+---------+---+------+-------------+
|url                              |frequency|cdc|type  |driver_number|
+---------------------------------+---------+---+------+-------------+
|https://api.openf1.org/v1/drivers|Once     |Yes|Master|1            |
+---------------------------------+---------+---+------+-------------+

root
 |-- url: string (nullable = true)
 |-- frequency: string (nullable = true)
 |-- cdc: string (nullable = true)
 |-- type: string (nullable = true)
 |-- driver_number: integer (nullable = true)

