In [2]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("tourist-enrichment-local")
    .config(
        "spark.jars.packages",
        "org.apache.hadoop:hadoop-aws:3.3.2,"
        "com.amazonaws:aws-java-sdk-bundle:1.12.767"
    )
    .master("local[*]")   
    .getOrCreate()
)

spark

In [3]:
import sys
sys.path.append("lambda_functions")

from fetch_tourist_estimates import fetch_estimates

In [4]:
import os
os.environ["BUCKET_NAME"] = "bucket-tara-weather-dest-v1"

In [5]:
BUCKET_NAME = os.getenv("BUCKET_NAME")

In [6]:
print(BUCKET_NAME)

bucket-tara-weather-dest-v1


In [7]:
weather_path    = f"s3a://{BUCKET_NAME}/weather_partitioned/"
pollution_path  = f"s3a://{BUCKET_NAME}/pollution_partitioned/"
sensor_path     = f"s3a://{BUCKET_NAME}/sensor_partitioned/"

In [8]:
weather_df = (
    spark.read
    .option("header", True)
    .csv(weather_path)
)

In [9]:
weather_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- time_nano: string (nullable = true)
 |-- time_date: string (nullable = true)
 |-- location_latitude: string (nullable = true)
 |-- location_longitude: string (nullable = true)
 |-- location_name: string (nullable = true)
 |-- weather_temperature: string (nullable = true)
 |-- weather_feelsLike: string (nullable = true)
 |-- weather_pressure: string (nullable = true)
 |-- weather_humidity: string (nullable = true)
 |-- weather_dewPoint: string (nullable = true)
 |-- weather_clouds: string (nullable = true)
 |-- weather_windSpeed: string (nullable = true)
 |-- weather_windDeg: string (nullable = true)
 |-- weather_windGust: string (nullable = true)
 |-- _input_file: string (nullable = true)
 |-- date: date (nullable = true)



In [10]:
pollution_df = (
    spark.read
    .option("header", True)
    .csv(pollution_path)
)

In [11]:
pollution_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- time_nano: string (nullable = true)
 |-- time_date: string (nullable = true)
 |-- location_latitude: string (nullable = true)
 |-- location_longitude: string (nullable = true)
 |-- location_name: string (nullable = true)
 |-- measurement_pm10Atmo: string (nullable = true)
 |-- measurement_pm25Atmo: string (nullable = true)
 |-- measurement_pm100Atmo: string (nullable = true)
 |-- _input_file: string (nullable = true)
 |-- date: date (nullable = true)



In [12]:
from pyspark.sql.functions import to_date, col

In [13]:
weather_df = weather_df.withColumn(
    "date",
    to_date(col("time_date"))  
)

pollution_df = pollution_df.withColumn(
    "date",
    to_date(col("time_date"))
)

In [14]:
weather_df.select("time_date", "date").show(5, truncate=False)
pollution_df.select("time_date", "date").show(5, truncate=False)

+-------------------+----------+
|time_date          |date      |
+-------------------+----------+
|2022-04-28 04:00:00|2022-04-28|
|2022-04-28 04:00:00|2022-04-28|
|2022-04-28 05:00:00|2022-04-28|
|2022-04-28 05:00:00|2022-04-28|
|2022-04-28 06:00:00|2022-04-28|
+-------------------+----------+
only showing top 5 rows

+---------------------+----------+
|time_date            |date      |
+---------------------+----------+
|2022-05-10 07:00:00.0|2022-05-10|
|2022-05-10 07:00:00.0|2022-05-10|
|2022-05-10 08:00:00.0|2022-05-10|
|2022-05-10 08:00:00.0|2022-05-10|
|2022-05-10 09:00:00.0|2022-05-10|
+---------------------+----------+
only showing top 5 rows



In [15]:
dates_df = (
    weather_df.select("date")
    .union(pollution_df.select("date"))
    .distinct()
)

In [18]:
dates_rows = dates_df.collect()
dates = [r["date"].strftime("%Y-%m-%d") for r in dates_rows]
dates = sorted(dates)

In [19]:
dates[:10], len(dates)

(['2022-03-30',
  '2022-03-31',
  '2022-04-01',
  '2022-04-02',
  '2022-04-03',
  '2022-04-04',
  '2022-04-05',
  '2022-04-06',
  '2022-04-07',
  '2022-04-08'],
 48)

In [20]:
def build_iasi_estimate_map(dates):
    estimate_map = {}
    for d in dates:
        print(f"Calling API for date={d}")
        resp = fetch_estimates(d)   # 1 API poziv za taj datum

        est_value = None
        for city_info in resp.get("info", []):
            if city_info.get("name") in ("Iasi", "IaÈ™i"):
                est_value = int(city_info["estimated_no_people"])
                break

        estimate_map[d] = est_value
    return estimate_map

In [21]:
iasi_map = build_iasi_estimate_map(dates)
iasi_map

Calling API for date=2022-03-30




Calling API for date=2022-03-31
Calling API for date=2022-04-01
Calling API for date=2022-04-02
Calling API for date=2022-04-03
Calling API for date=2022-04-04
Calling API for date=2022-04-05
Calling API for date=2022-04-06
Calling API for date=2022-04-07
Calling API for date=2022-04-08
Calling API for date=2022-04-09
Calling API for date=2022-04-10
Calling API for date=2022-04-11
Calling API for date=2022-04-12
Calling API for date=2022-04-13
Calling API for date=2022-04-14
Calling API for date=2022-04-15
Calling API for date=2022-04-16
Calling API for date=2022-04-17
Calling API for date=2022-04-18
Calling API for date=2022-04-19
Calling API for date=2022-04-20
Calling API for date=2022-04-21
Calling API for date=2022-04-22
Calling API for date=2022-04-23
Calling API for date=2022-04-24
Calling API for date=2022-04-25
Calling API for date=2022-04-26
Calling API for date=2022-04-27
Calling API for date=2022-04-28
Calling API for date=2022-04-29
Calling API for date=2022-04-30
Calling 

{'2022-03-30': 1060,
 '2022-03-31': 1184,
 '2022-04-01': 236,
 '2022-04-02': 249,
 '2022-04-03': 181,
 '2022-04-04': 102,
 '2022-04-05': 96,
 '2022-04-06': 180,
 '2022-04-07': 290,
 '2022-04-08': 341,
 '2022-04-09': 304,
 '2022-04-10': 231,
 '2022-04-11': 211,
 '2022-04-12': 284,
 '2022-04-13': 410,
 '2022-04-14': 499,
 '2022-04-15': 497,
 '2022-04-16': 437,
 '2022-04-17': 406,
 '2022-04-18': 467,
 '2022-04-19': 599,
 '2022-04-20': 718,
 '2022-04-21': 755,
 '2022-04-22': 717,
 '2022-04-23': 682,
 '2022-04-24': 728,
 '2022-04-25': 859,
 '2022-04-26': 1003,
 '2022-04-27': 1079,
 '2022-04-28': 1069,
 '2022-04-29': 1036,
 '2022-04-30': 1068,
 '2022-05-01': 238,
 '2022-05-02': 251,
 '2022-05-03': 183,
 '2022-05-04': 104,
 '2022-05-05': 98,
 '2022-05-06': 181,
 '2022-05-07': 292,
 '2022-05-08': 343,
 '2022-05-09': 306,
 '2022-05-10': 233,
 '2022-05-11': 212,
 '2022-05-12': 286,
 '2022-05-13': 412,
 '2022-05-14': 500,
 '2022-05-15': 499,
 '2022-05-16': 439}

In [22]:
from pyspark.sql.functions import date_format, col

In [23]:
weather_df = weather_df.withColumn(
    "date_str",
    date_format(col("date"), "yyyy-MM-dd")
)

pollution_df = pollution_df.withColumn(
    "date_str",
    date_format(col("date"), "yyyy-MM-dd")
)

In [24]:
weather_df.select("time_date", "date", "date_str").show(5, truncate=False)
pollution_df.select("time_date", "date", "date_str").show(5, truncate=False)

+-------------------+----------+----------+
|time_date          |date      |date_str  |
+-------------------+----------+----------+
|2022-04-28 04:00:00|2022-04-28|2022-04-28|
|2022-04-28 04:00:00|2022-04-28|2022-04-28|
|2022-04-28 05:00:00|2022-04-28|2022-04-28|
|2022-04-28 05:00:00|2022-04-28|2022-04-28|
|2022-04-28 06:00:00|2022-04-28|2022-04-28|
+-------------------+----------+----------+
only showing top 5 rows

+---------------------+----------+----------+
|time_date            |date      |date_str  |
+---------------------+----------+----------+
|2022-05-10 07:00:00.0|2022-05-10|2022-05-10|
|2022-05-10 07:00:00.0|2022-05-10|2022-05-10|
|2022-05-10 08:00:00.0|2022-05-10|2022-05-10|
|2022-05-10 08:00:00.0|2022-05-10|2022-05-10|
|2022-05-10 09:00:00.0|2022-05-10|2022-05-10|
+---------------------+----------+----------+
only showing top 5 rows



In [25]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

In [30]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

rows = [(d, est) for d, est in iasi_map.items()]

schema = StructType([
    StructField("date_str", StringType(), False),
    StructField("tourist_estimate", IntegerType(), False),
])

iasi_df = spark.createDataFrame(rows, schema)

iasi_df.show(5, truncate=False)

Py4JJavaError: An error occurred while calling o139.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 17.0 failed 1 times, most recent failure: Lost task 0.0 in stage 17.0 (TID 1127) (DESKTOP-DHIESJT.mshome.net executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:189)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:157)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:713)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:757)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:675)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:641)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:617)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:574)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:532)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:176)
	... 29 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:506)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:459)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:288)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:327)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:189)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:157)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:713)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:757)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:675)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:641)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:617)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:574)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:532)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:176)
	... 29 more


In [31]:
import csv

rows = [("date_str", "tourist_estimate")]  
for d, est in sorted(iasi_map.items()):
    rows.append((d, str(est)))

local_csv_path = "iasi_estimates.csv"

with open(local_csv_path, "w", newline="", encoding="utf-8") as f:
    writer = csv.writer(f)
    writer.writerows(rows)

local_csv_path

'iasi_estimates.csv'

In [32]:
!aws s3 cp iasi_estimates.csv s3://bucket-tara-weather-dest-v1/lookup/iasi_estimates.csv

Completed 800 Bytes/800 Bytes (490 Bytes/s) with 1 file(s) remaining
upload: .\iasi_estimates.csv to s3://bucket-tara-weather-dest-v1/lookup/iasi_estimates.csv


In [33]:
lookup_path = f"s3a://{BUCKET_NAME}/lookup/iasi_estimates.csv"

iasi_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(lookup_path)
)

iasi_df.show(5, truncate=False)

+-------------------+----------------+
|date_str           |tourist_estimate|
+-------------------+----------------+
|2022-03-30 00:00:00|1060            |
|2022-03-31 00:00:00|1184            |
|2022-04-01 00:00:00|236             |
|2022-04-02 00:00:00|249             |
|2022-04-03 00:00:00|181             |
+-------------------+----------------+
only showing top 5 rows



In [34]:
weather_enriched = (
    weather_df
    .join(iasi_df, on="date_str", how="left")
)

pollution_enriched = (
    pollution_df
    .join(iasi_df, on="date_str", how="left")
)

weather_enriched.select("time_date", "date_str", "tourist_estimate").show(10, truncate=False)
pollution_enriched.select("time_date", "date_str", "tourist_estimate").show(10, truncate=False)

+-------------------+----------+----------------+
|time_date          |date_str  |tourist_estimate|
+-------------------+----------+----------------+
|2022-04-28 04:00:00|2022-04-28|1069            |
|2022-04-28 04:00:00|2022-04-28|1069            |
|2022-04-28 05:00:00|2022-04-28|1069            |
|2022-04-28 05:00:00|2022-04-28|1069            |
|2022-04-28 06:00:00|2022-04-28|1069            |
|2022-04-28 06:00:00|2022-04-28|1069            |
|2022-04-28 07:00:00|2022-04-28|1069            |
|2022-04-28 07:00:00|2022-04-28|1069            |
|2022-04-28 08:00:00|2022-04-28|1069            |
|2022-04-28 09:00:00|2022-04-28|1069            |
+-------------------+----------+----------------+
only showing top 10 rows

+---------------------+----------+----------------+
|time_date            |date_str  |tourist_estimate|
+---------------------+----------+----------------+
|2022-05-10 07:00:00.0|2022-05-10|233             |
|2022-05-10 07:00:00.0|2022-05-10|233             |
|2022-05-10 08

In [41]:
weather_out_path   = f"s3a://{BUCKET_NAME}/weather_partitioned_enriched/"
pollution_out_path = f"s3a://{BUCKET_NAME}/pollution_partitioned_enriched/"

In [48]:
(
    weather_enriched
    .write
    .mode("overwrite")
    .option("header", True)
    .partitionBy("date")
    .csv(weather_out_path)
)

(
    pollution_enriched
    .write
    .mode("overwrite")
    .option("header", True)
    .partitionBy("date")
    .csv(pollution_out_path)
)

Py4JJavaError: An error occurred while calling o216.csv.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:651)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:288)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:851)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 11 in stage 38.0 failed 1 times, most recent failure: Lost task 11.0 in stage 38.0 (TID 1252) (DESKTOP-DHIESJT.mshome.net executor driver): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:655)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:358)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$22(FileFormatWriter.scala:266)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1218)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1423)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:406)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:390)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:340)
	at org.apache.hadoop.fs.RawLocalFileSystem.rename(RawLocalFileSystem.java:505)
	at org.apache.hadoop.fs.ChecksumFileSystem.rename(ChecksumFileSystem.java:694)
	at org.apache.hadoop.hive.ql.io.ProxyLocalFileSystem.rename(ProxyLocalFileSystem.java:34)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:600)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:571)
	at org.apache.spark.mapred.SparkHadoopMapRedUtil$.$anonfun$commitTask$1(SparkHadoopMapRedUtil.scala:51)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:642)
	at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:51)
	at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:78)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:279)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.$anonfun$commit$1(FileFormatDataWriter.scala:107)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:642)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:107)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:342)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:348)
	... 9 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:255)
	... 42 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:655)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:358)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$22(FileFormatWriter.scala:266)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1218)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1423)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:406)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:390)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:340)
	at org.apache.hadoop.fs.RawLocalFileSystem.rename(RawLocalFileSystem.java:505)
	at org.apache.hadoop.fs.ChecksumFileSystem.rename(ChecksumFileSystem.java:694)
	at org.apache.hadoop.hive.ql.io.ProxyLocalFileSystem.rename(ProxyLocalFileSystem.java:34)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:600)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:571)
	at org.apache.spark.mapred.SparkHadoopMapRedUtil$.$anonfun$commitTask$1(SparkHadoopMapRedUtil.scala:51)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:642)
	at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:51)
	at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:78)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:279)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.$anonfun$commit$1(FileFormatDataWriter.scala:107)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:642)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:107)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:342)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:348)
	... 9 more


In [50]:
weather_pd = weather_enriched.toPandas()
pollution_pd = pollution_enriched.toPandas()

weather_pd.dtypes

date_str               object
name                   object
time_nano              object
time_date              object
location_latitude      object
location_longitude     object
location_name          object
weather_temperature    object
weather_feelsLike      object
weather_pressure       object
weather_humidity       object
weather_dewPoint       object
weather_clouds         object
weather_windSpeed      object
weather_windDeg        object
weather_windGust       object
_input_file            object
date                   object
tourist_estimate        int32
dtype: object

In [51]:
pollution_pd.dtypes

date_str                 object
name                     object
time_nano                object
time_date                object
location_latitude        object
location_longitude       object
location_name            object
measurement_pm10Atmo     object
measurement_pm25Atmo     object
measurement_pm100Atmo    object
_input_file              object
date                     object
tourist_estimate          int32
dtype: object

In [45]:
weather_local_path = "data/weather_enriched.csv"
pollution_local_path = "data/pollution_enriched.csv"

weather_pd.to_csv(weather_local_path, index=False)
pollution_pd.to_csv(pollution_local_path, index=False)

In [46]:
!aws s3 cp data/weather_enriched.csv s3://$BUCKET_NAME/weather_partitioned_enriched/weather_enriched.csv

!aws s3 cp data/pollution_enriched.csv s3://$BUCKET_NAME/pollution_partitioned_enriched/pollution_enriched.csv

Completed 1.0 MiB/1.1 MiB (1.3 MiB/s) with 1 file(s) remaining
Completed 1.1 MiB/1.1 MiB (539.5 KiB/s) with 1 file(s) remaining
upload: data\weather_enriched.csv to s3://bucket-tara-weather-dest-v1/weather_partitioned_enriched/weather_enriched.csv
Completed 746.2 KiB/746.2 KiB (1.1 MiB/s) with 1 file(s) remaining
upload: data\pollution_enriched.csv to s3://bucket-tara-weather-dest-v1/pollution_partitioned_enriched/pollution_enriched.csv
