In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
from pyspark.sql.functions import max
import time
import os
import json
import requests
import pandas as pd
from datetime import datetime

In [2]:
location_parameters = {
    67569: ['um050', 'pressure', 'humidity', 'temperature', 'um005', 'um003', 'pm1', 'um025', 'um010', 'um100', 'pm25',
            'pm10'],  # Bangalore
    288233: ["pm25", "um100", "pressure", "um010", "um025", "pm10", "um003", "humidity", "um005", "pm1", "temperature",
             "um050"],  # Kalol, Gandhi Nagar
    64931: ["um100", "um050", "um005", 'pm10', 'um025', 'pm1', 'pm25', 'um003', 'um010'],  # Bhatinda
    66673: ['um003', 'um003', 'pm10', 'um005', 'pm25', 'um010''um100''pressure', 'um050', 'um025', 'pm1', 'temperature',
            'humidity'],  # Hisar
    8118: ["pm25"],  # New Delhi
    62543: ['pressure', 'pm25', 'um010', 'humidity', 'um003', 'temperature', 'um100', 'um025', 'um050', 'um005', 'pm10',
            'pm1'],  # Greater Kailash 2
    1667903: ['um010', 'humidity', 'temperature', 'um050', 'pm1', 'um003', 'um005', 'pm25', 'pm10', 'um025', 'pressure',
              'um100'],  # 15 Oak Drive Outdoor
    362098: ['temperature', 'um050', 'pressure', 'um003', 'um005', 'humidity', 'um025', 'um100', 'um010', 'pm10', 'pm1',
             'pm25'],  # Greater Noida
    67569: ['pm10', 'um010', 'um100', 'um050', 'um005', 'pressure', 'um025', 'um003', 'temperature', 'pm1', 'humidity',
            'pm25'],  # Tarkeshwar, West Bengal
    8172: ['pm25'],  # Kolkata
    220704: ['temperature', 'um010', 'pm10', 'pm25', 'pressure', 'um025', 'humidity', 'temperature', 'um003', 'um100',
             'um050', 'um005', 'pm1'],  # Kharagpur, West Bengal
    8039: ['pm25'],  # Mumbai
    8557: ['pm25'],  # Hyderabad
    63704: ['um003', 'um025', 'pm10', 'pm1', 'humidity', 'temperature', 'um010', 'um005', 'um050', 'pm25', 'um100',
            'pressure'],  # Madikeri, Karnataka
    229138: ['um050', 'pm10', 'temperature', 'humidity', 'pm1', 'um003', 'pressure', 'um025', 'um005', 'um010', 'um100',
             'pm25'],  # Srinivaspur, Karnataka
    8558: ['pm25']  # Chennai
}

# creating a spark session
spark = SparkSession.builder\
    .appName("warehouse_dump")\
    .getOrCreate()

# get the warehouse table
schema="locationId INT, local_time TIMESTAMP, parameter STRING, value DOUBLE"
df = spark.read\
    .option('schema',schema)\
    .orc('hdfs://localhost:9000/user/OpenAQ/data/input')

# get the max date
start_date = df.agg(max('utc').cast('date')).collect()[0][0]

# get the today's date
today_datetime = datetime.fromtimestamp(time.time())
today_date = today_datetime.date()
# creating a date range 
date_range = pd.date_range(start=start_date, end=today_date, freq='D')

# reading the local data
for i in range(len(date_range)-1):
    for location_id in location_parameters.keys():
        date_from = str(date_range[i])[:10]
        date_to = str(date_range[i + 1])[:10]
        dy = date_from[-2:]
        mnt = date_from[5:7]
        yr = date_from[:4]
        for parameter in location_parameters[location_id]:
            try:
                data = spark.read \
                    .option('multiline', True) \
                    .json(f'/home/sad7_5407/Desktop/Data_Engineering/data/{yr}/{mnt}/{dy}/{location_id}/{parameter}.json')
                # selecting the required columns
                final_df = data.select('locationId', 'date.utc', 'parameter', 'value')
                # # dumping into warehouse
                # final_df.write.format("orc") \
                #     .mode('append').save("hdfs://localhost:9000/user/OpenAQ/data/input")
                final_df.show(truncate=False)
            except AnalysisException:
                print(f'inferSchema failed for {location_id}/{parameter}..')
            time.sleep(2)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/12 01:03:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

inferSchema failed for 67569/pm10..
inferSchema failed for 67569/um010..
inferSchema failed for 67569/um100..
inferSchema failed for 67569/um050..
inferSchema failed for 67569/um005..
inferSchema failed for 67569/pressure..
inferSchema failed for 67569/um025..
inferSchema failed for 67569/um003..
inferSchema failed for 67569/temperature..
inferSchema failed for 67569/pm1..
inferSchema failed for 67569/humidity..
inferSchema failed for 67569/pm25..
inferSchema failed for 288233/pm25..
inferSchema failed for 288233/um100..
inferSchema failed for 288233/pressure..
inferSchema failed for 288233/um010..
inferSchema failed for 288233/um025..
inferSchema failed for 288233/pm10..
inferSchema failed for 288233/um003..
inferSchema failed for 288233/humidity..
inferSchema failed for 288233/um005..
inferSchema failed for 288233/pm1..
inferSchema failed for 288233/temperature..
inferSchema failed for 288233/um050..
inferSchema failed for 64931/um100..
inferSchema failed for 64931/um050..
inferSchem

In [5]:
data = spark.read \
                    .option('multiline', True) \
                    .json(f'/home/sad7_5407/Desktop/Data_Engineering/data/2024/02/09/67569/pm10.json')

final_df = data.select('locationId', 'date.utc', 'parameter', 'value')

+----------+-------------------------+---------+-----+
|locationId|utc                      |parameter|value|
+----------+-------------------------+---------+-----+
|67569     |2024-02-10T05:27:47+00:00|pm10     |62.7 |
|67569     |2024-02-10T05:25:47+00:00|pm10     |62.5 |
|67569     |2024-02-10T05:23:47+00:00|pm10     |62.8 |
|67569     |2024-02-10T05:21:47+00:00|pm10     |61.4 |
|67569     |2024-02-10T05:17:47+00:00|pm10     |62.2 |
|67569     |2024-02-10T05:13:47+00:00|pm10     |62.0 |
|67569     |2024-02-10T05:11:47+00:00|pm10     |61.7 |
|67569     |2024-02-10T05:07:47+00:00|pm10     |65.2 |
|67569     |2024-02-10T05:05:47+00:00|pm10     |68.4 |
|67569     |2024-02-10T05:03:47+00:00|pm10     |64.4 |
|67569     |2024-02-10T05:01:47+00:00|pm10     |63.8 |
|67569     |2024-02-10T04:59:47+00:00|pm10     |66.4 |
|67569     |2024-02-10T04:57:47+00:00|pm10     |65.6 |
|67569     |2024-02-10T04:55:46+00:00|pm10     |66.3 |
|67569     |2024-02-10T04:53:46+00:00|pm10     |66.8 |
|67569    

In [6]:
final_df.show(truncate=False)

24/02/12 01:20:40 ERROR Executor: Exception in task 0.0 in stage 9.0 (TID 36)
org.apache.spark.SparkFileNotFoundException: File file:/home/sad7_5407/Desktop/Data_Engineering/data/2024/02/09/67569/pm10.json does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:780)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:220)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:279)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sq

Py4JJavaError: An error occurred while calling o671.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 36) (ASPIRE-5 executor driver): org.apache.spark.SparkFileNotFoundException: File file:/home/sad7_5407/Desktop/Data_Engineering/data/2024/02/09/67569/pm10.json does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:780)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:220)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:279)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4344)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3326)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4334)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4332)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3326)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3549)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkFileNotFoundException: File file:/home/sad7_5407/Desktop/Data_Engineering/data/2024/02/09/67569/pm10.json does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:780)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:220)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:279)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [2]:
recent_df = spark.read\
    .csv('hdfs://localhost:9000/user/OpenAQ/data/latest',header=True)

recent_df.show(truncate=False)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/02/12 02:29:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


                                                                                

+----------+-------------------------+-----------+-----+
|locationId|utc                      |parameter  |value|
+----------+-------------------------+-----------+-----+
|66673     |2024-02-07T05:28:44+00:00|temperature|73.0 |
|66673     |2024-02-07T05:26:43+00:00|temperature|73.0 |
|66673     |2024-02-07T05:24:44+00:00|temperature|72.0 |
|66673     |2024-02-07T05:22:43+00:00|temperature|72.0 |
|66673     |2024-02-07T05:20:43+00:00|temperature|73.0 |
|66673     |2024-02-07T05:18:43+00:00|temperature|73.0 |
|66673     |2024-02-07T05:16:44+00:00|temperature|73.0 |
|66673     |2024-02-07T05:14:44+00:00|temperature|74.0 |
|66673     |2024-02-07T05:12:44+00:00|temperature|74.0 |
|66673     |2024-02-07T05:10:43+00:00|temperature|74.0 |
|66673     |2024-02-07T05:08:44+00:00|temperature|74.0 |
|66673     |2024-02-07T05:06:43+00:00|temperature|74.0 |
|66673     |2024-02-07T05:04:43+00:00|temperature|74.0 |
|66673     |2024-02-07T05:02:43+00:00|temperature|74.0 |
|66673     |2024-02-07T05:00:44

In [None]:
schema="locationId INT, utc TIMESTAMP, parameter STRING, value DOUBLE"
wrh_data = spark.read\
    .option('schema',schema)\
    .orc('hdfs://localhost:9000/user/OpenAQ/data/input')

wrh_data.show(truncate=False)

In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
from pyspark.sql.functions import max
import time
import os
import json
import requests
import pandas as pd
from datetime import datetime

def etl_process():
    def save_data():
        govt_sensors=set([8039, 8118, 8172, 8557, 8558])
        if location_id in govt_sensors:
            # govt
            url = f"https://api.openaq.org/v2/measurements?location_id={location_id}&parameter={parameter}&date_from={date_from}T00:00:00+05:30&date_to={date_to}T00:00:00+05:30"
        else:
            # community
            url = f"https://api.openaq.org/v2/measurements?location_id={location_id}&parameter={parameter}&date_from={date_from}T21:00:00+05:30&date_to={date_to}T00:00:00+05:30"
        response = requests.get(url)
        if response.status_code == 200:
            print(f'Hit Success on {url}.....')
            # Access the response data in JSON format
            data = response.json()
            # check if you've got some data or not
            if len(data['results']) > 0:
                with open(f'/home/sad7_5407/Desktop/Data_Engineering/data/{yr}/{mnt}/{dy}/{location_id}/{parameter}.json', 'w') as file:
                    json.dump(data['results'], file, indent=2)
                    print(f'data/{yr}/{mnt}/{dy}/{location_id}/{parameter}.json saved locally....')
            else:
                print(f'>>>>>>Nothing on {url}')
        else:
            print(f">>>>>>Error in hit on {url}: {response.status_code}")
        time.sleep(2)

    location_parameters = {
        67569: ['um050', 'pressure', 'humidity', 'temperature', 'um005', 'um003', 'pm1', 'um025', 'um010', 'um100', 'pm25',
                'pm10'],  # Bangalore
        288233: ["pm25", "um100", "pressure", "um010", "um025", "pm10", "um003", "humidity", "um005", "pm1", "temperature",
                 "um050"],  # Kalol, Gandhi Nagar
        64931: ["um100", "um050", "um005", 'pm10', 'um025', 'pm1', 'pm25', 'um003', 'um010'],  # Bhatinda
        66673: ['um003', 'um003', 'pm10', 'um005', 'pm25', 'um010''um100''pressure', 'um050', 'um025', 'pm1', 'temperature',
                'humidity'],  # Hisar
        8118: ["pm25"],  # New Delhi
        62543: ['pressure', 'pm25', 'um010', 'humidity', 'um003', 'temperature', 'um100', 'um025', 'um050', 'um005', 'pm10',
                'pm1'],  # Greater Kailash 2
        1667903: ['um010', 'humidity', 'temperature', 'um050', 'pm1', 'um003', 'um005', 'pm25', 'pm10', 'um025', 'pressure',
                  'um100'],  # 15 Oak Drive Outdoor
        362098: ['temperature', 'um050', 'pressure', 'um003', 'um005', 'humidity', 'um025', 'um100', 'um010', 'pm10', 'pm1',
                 'pm25'],  # Greater Noida
        67569: ['pm10', 'um010', 'um100', 'um050', 'um005', 'pressure', 'um025', 'um003', 'temperature', 'pm1', 'humidity',
                'pm25'],  # Tarkeshwar, West Bengal
        8172: ['pm25'],  # Kolkata
        220704: ['temperature', 'um010', 'pm10', 'pm25', 'pressure', 'um025', 'humidity', 'temperature', 'um003', 'um100',
                 'um050', 'um005', 'pm1'],  # Kharagpur, West Bengal
        8039: ['pm25'],  # Mumbai
        8557: ['pm25'],  # Hyderabad
        63704: ['um003', 'um025', 'pm10', 'pm1', 'humidity', 'temperature', 'um010', 'um005', 'um050', 'pm25', 'um100',
                'pressure'],  # Madikeri, Karnataka
        229138: ['um050', 'pm10', 'temperature', 'humidity', 'pm1', 'um003', 'pressure', 'um025', 'um005', 'um010', 'um100',
                 'pm25'],  # Srinivaspur, Karnataka
        8558: ['pm25']  # Chennai
    }

    # creating a spark session
    spark = SparkSession.builder\
        .appName("warehouse_dump")\
        .getOrCreate()

    # get the max date
    start_date = datetime(2024,2,1)

    # get the today's date
    today_datetime = datetime.fromtimestamp(time.time())
    today_date = today_datetime.date()

    date_range = pd.date_range(start=start_date, end=today_date, freq='D')

    # hitting the api and getting the data for the dates
    for i in range(len(date_range)-1):
        date_from = str(date_range[i])[:10]
        date_to = str(date_range[i+1])[:10]

        dy = date_from[-2:]
        mnt = date_from[5:7]
        yr = date_from[:4]

        for location_id in location_parameters.keys():
            # make the directory of location_id
            os.makedirs(f'/home/sad7_5407/Desktop/Data_Engineering/data/{yr}/{mnt}/{dy}/{location_id}', exist_ok=True)
            for parameter in location_parameters[location_id]:
                save_data()

    # reading the local data
    for i in range(len(date_range)-1):
        for location_id in location_parameters.keys():
            date_from = str(date_range[i])[:10]
            date_to = str(date_range[i + 1])[:10]
            dy = date_from[-2:]
            mnt = date_from[5:7]
            yr = date_from[:4]
            for parameter in location_parameters[location_id]:
                try:
                    if os.path.exists(f'/home/sad7_5407/Desktop/Data_Engineering/data/{yr}/{mnt}/{dy}/{location_id}/{parameter}.json'):
                        data = spark.read \
                            .option('multiline', True) \
                            .json(f'/home/sad7_5407/Desktop/Data_Engineering/data/{yr}/{mnt}/{dy}/{location_id}/{parameter}.json')
                        print(f'>>>>>>> {yr}/{mnt}/{dy}/{location_id}/{parameter}.json read success..',end=' ')
                        
                        # selecting the required columns
                        final_df = data.select('locationId', 'date.utc', 'parameter', 'value')
                        # dropping the duplicates
                        final_df.drop_duplicates()
                        # dumping into warehouse
                        final_df.write.format("orc") \
                            .mode('append').save("hdfs://localhost:9000/user/OpenAQ/data/input")
                        print(f'hdfs dump success...')

                except AnalysisException:
                    print(f'inferSchema failed for data/{yr}/{mnt}/{dy}/{location_id}/{parameter}..')


etl_process()

Hit Success on https://api.openaq.org/v2/measurements?location_id=67569&parameter=pm10&date_from=2024-02-01T21:00:00+05:30&date_to=2024-02-02T00:00:00+05:30.....
data/2024/02/01/67569/pm10.json saved locally....
Hit Success on https://api.openaq.org/v2/measurements?location_id=67569&parameter=um010&date_from=2024-02-01T21:00:00+05:30&date_to=2024-02-02T00:00:00+05:30.....
data/2024/02/01/67569/um010.json saved locally....
Hit Success on https://api.openaq.org/v2/measurements?location_id=67569&parameter=um100&date_from=2024-02-01T21:00:00+05:30&date_to=2024-02-02T00:00:00+05:30.....
data/2024/02/01/67569/um100.json saved locally....
Hit Success on https://api.openaq.org/v2/measurements?location_id=67569&parameter=um050&date_from=2024-02-01T21:00:00+05:30&date_to=2024-02-02T00:00:00+05:30.....
data/2024/02/01/67569/um050.json saved locally....
Hit Success on https://api.openaq.org/v2/measurements?location_id=67569&parameter=um005&date_from=2024-02-01T21:00:00+05:30&date_to=2024-02-02T00:

TypeError: dropDuplicates() got an unexpected keyword argument 'inplace'

In [23]:
# creating a spark session
spark = SparkSession.builder\
    .appName("warehouse_dump")\
    .getOrCreate()

location_parameters = {
        67569: ['um050', 'pressure', 'humidity', 'temperature', 'um005', 'um003', 'pm1', 'um025', 'um010', 'um100', 'pm25',
                'pm10'],  # Bangalore
        288233: ["pm25", "um100", "pressure", "um010", "um025", "pm10", "um003", "humidity", "um005", "pm1", "temperature",
                 "um050"],  # Kalol, Gandhi Nagar
        64931: ["um100", "um050", "um005", 'pm10', 'um025', 'pm1', 'pm25', 'um003', 'um010'],  # Bhatinda
        66673: ['um003', 'um003', 'pm10', 'um005', 'pm25', 'um010''um100''pressure', 'um050', 'um025', 'pm1', 'temperature',
                'humidity'],  # Hisar
        8118: ["pm25"],  # New Delhi
        62543: ['pressure', 'pm25', 'um010', 'humidity', 'um003', 'temperature', 'um100', 'um025', 'um050', 'um005', 'pm10',
                'pm1'],  # Greater Kailash 2
        1667903: ['um010', 'humidity', 'temperature', 'um050', 'pm1', 'um003', 'um005', 'pm25', 'pm10', 'um025', 'pressure',
                  'um100'],  # 15 Oak Drive Outdoor
        362098: ['temperature', 'um050', 'pressure', 'um003', 'um005', 'humidity', 'um025', 'um100', 'um010', 'pm10', 'pm1',
                 'pm25'],  # Greater Noida
        67569: ['pm10', 'um010', 'um100', 'um050', 'um005', 'pressure', 'um025', 'um003', 'temperature', 'pm1', 'humidity',
                'pm25'],  # Tarkeshwar, West Bengal
        8172: ['pm25'],  # Kolkata
        220704: ['temperature', 'um010', 'pm10', 'pm25', 'pressure', 'um025', 'humidity', 'temperature', 'um003', 'um100',
                 'um050', 'um005', 'pm1'],  # Kharagpur, West Bengal
        8039: ['pm25'],  # Mumbai
        8557: ['pm25'],  # Hyderabad
        63704: ['um003', 'um025', 'pm10', 'pm1', 'humidity', 'temperature', 'um010', 'um005', 'um050', 'pm25', 'um100',
                'pressure'],  # Madikeri, Karnataka
        229138: ['um050', 'pm10', 'temperature', 'humidity', 'pm1', 'um003', 'pressure', 'um025', 'um005', 'um010', 'um100',
                 'pm25'],  # Srinivaspur, Karnataka
        8558: ['pm25']  # Chennai
    }

start_date = datetime(2024,2,1)

# get the today's date
today_datetime = datetime.fromtimestamp(time.time())
today_date = today_datetime.date()

date_range = pd.date_range(start=start_date, end=today_date, freq='D')

# reading the local data
for i in range(len(date_range)-1):
    for location_id in location_parameters.keys():
        date_from = str(date_range[i])[:10]
        date_to = str(date_range[i + 1])[:10]
        dy = date_from[-2:]
        mnt = date_from[5:7]
        yr = date_from[:4]
        for parameter in location_parameters[location_id]:
            try:
                if os.path.exists(f'/home/sad7_5407/Desktop/Data_Engineering/data/{yr}/{mnt}/{dy}/{location_id}/{parameter}.json'):
                    data = spark.read \
                        .option('multiline', True) \
                        .json(f'/home/sad7_5407/Desktop/Data_Engineering/data/{yr}/{mnt}/{dy}/{location_id}/{parameter}.json')
                    print(f'>>>>>>> {yr}/{mnt}/{dy}/{location_id}/{parameter}.json read success..',end=' ')
                    
                    # selecting the required columns
                    final_df = data.select('locationId', 'date.utc', 'parameter', 'value')
                
                    # dumping into warehouse
                    final_df.write.format("orc") \
                        .mode('append').save("hdfs://localhost:9000/user/OpenAQ/data/input")
                    print(f'hdfs dump success...')

            except AnalysisException:
                print(f'inferSchema failed for data/{yr}/{mnt}/{dy}/{location_id}/{parameter}..')

>>>>>>> 2024/02/01/67569/pm10.json read success.. 

                                                                                

hdfs dump success...
>>>>>>> 2024/02/01/67569/um010.json read success.. hdfs dump success...
>>>>>>> 2024/02/01/67569/um100.json read success.. hdfs dump success...
>>>>>>> 2024/02/01/67569/um050.json read success.. hdfs dump success...
>>>>>>> 2024/02/01/67569/um005.json read success.. hdfs dump success...
>>>>>>> 2024/02/01/67569/pressure.json read success.. hdfs dump success...
>>>>>>> 2024/02/01/67569/um025.json read success.. hdfs dump success...
>>>>>>> 2024/02/01/67569/um003.json read success.. hdfs dump success...
>>>>>>> 2024/02/01/67569/temperature.json read success.. hdfs dump success...
>>>>>>> 2024/02/01/67569/pm1.json read success.. hdfs dump success...
>>>>>>> 2024/02/01/67569/humidity.json read success.. hdfs dump success...
>>>>>>> 2024/02/01/67569/pm25.json read success.. hdfs dump success...
>>>>>>> 2024/02/01/288233/pm25.json read success.. hdfs dump success...
>>>>>>> 2024/02/01/288233/um100.json read success.. hdfs dump success...
>>>>>>> 2024/02/01/288233/pressur

In [24]:
schema="locationId INT, utc_time TIMESTAMP, parameter STRING, value DOUBLE"
df = spark.read\
    .option('schema',schema)\
    .orc('hdfs://localhost:9000/user/OpenAQ/data/input')
df.show(truncate=False)

+----------+-------------------------+---------+------+
|locationId|utc                      |parameter|value |
+----------+-------------------------+---------+------+
|66673     |2024-02-06T05:21:14+00:00|um003    |71.36 |
|66673     |2024-02-06T05:17:14+00:00|um003    |69.68 |
|66673     |2024-02-06T04:55:14+00:00|um003    |60.9  |
|66673     |2024-02-06T04:35:14+00:00|um003    |69.89 |
|66673     |2024-02-06T04:31:14+00:00|um003    |76.54 |
|66673     |2024-02-06T04:19:12+00:00|um003    |127.79|
|66673     |2024-02-06T04:15:11+00:00|um003    |162.12|
|66673     |2024-02-06T04:07:11+00:00|um003    |165.7 |
|66673     |2024-02-06T04:05:11+00:00|um003    |150.73|
|66673     |2024-02-06T04:03:11+00:00|um003    |155.66|
|66673     |2024-02-06T04:01:12+00:00|um003    |166.69|
|66673     |2024-02-06T03:51:12+00:00|um003    |186.64|
|66673     |2024-02-06T03:37:11+00:00|um003    |145.55|
|66673     |2024-02-06T03:35:11+00:00|um003    |143.18|
|66673     |2024-02-06T03:19:10+00:00|um003    |