In [53]:
spark

In [54]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Read CSV from HDFS") \
    .getOrCreate()

# Path to the CSV file on HDFS
file_path = "hdfs:///user/student/cleaned.csv"

# Read the CSV file into a DataFrame
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Show the DataFrame Schema
df.printSchema()



root
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Water Content (m3/m3): double (nullable = true)
 |-- Solar Radiation (W/m2): double (nullable = true)
 |-- Rain (mm): double (nullable = true)
 |-- Temperature (Celcius): double (nullable = true)
 |-- RH (%): double (nullable = true)
 |-- Wind Speed (m/s): double (nullable = true)
 |-- Gust Speed (m/s): double (nullable = true)
 |-- Wind Direction (Degree): double (nullable = true)
 |-- Dew Point (Celcius): double (nullable = true)



                                                                                

## Partition by Month

In [55]:
from pyspark.sql.functions import to_date, date_format, col

# Rename problematic columns (contains special characters like spaces and parentheses)
df = df.withColumnRenamed("Water Content (m3/m3)", "Water_Content_m3_m3") \
       .withColumnRenamed("Solar Radiation (W/m2)", "Solar_Radiation_W_m2") \
       .withColumnRenamed("Rain (mm)", "Rain_mm") \
       .withColumnRenamed("Temperature (Celcius)", "Temperature_Celsius") \
       .withColumnRenamed("RH (%)", "RH_percent") \
       .withColumnRenamed("Wind Speed (m/s)", "Wind_Speed_m_s") \
       .withColumnRenamed("Gust Speed (m/s)", "Gust_Speed_m_s") \
       .withColumnRenamed("Wind Direction (Degree)", "Wind_Direction_Degree") \
       .withColumnRenamed("Dew Point (Celcius)", "Dew_Point_Celsius")

# Ensure the 'Date' is correctly formatted
df = df.withColumn("Date", to_date("Date", "yyyy-MM-dd"))

# Extract the month from the 'Date' column
df = df.withColumn("month", date_format(col("Date"), "yyyy-MM"))

# Print schema and show the first 5 rows of the 'month' column
df.printSchema()
df.select("month").show(5)

root
 |-- Date: date (nullable = true)
 |-- Time: string (nullable = true)
 |-- Water_Content_m3_m3: double (nullable = true)
 |-- Solar_Radiation_W_m2: double (nullable = true)
 |-- Rain_mm: double (nullable = true)
 |-- Temperature_Celsius: double (nullable = true)
 |-- RH_percent: double (nullable = true)
 |-- Wind_Speed_m_s: double (nullable = true)
 |-- Gust_Speed_m_s: double (nullable = true)
 |-- Wind_Direction_Degree: double (nullable = true)
 |-- Dew_Point_Celsius: double (nullable = true)
 |-- month: string (nullable = true)

+-------+
|  month|
+-------+
|2021-05|
|2021-05|
|2021-05|
|2021-05|
|2021-05|
+-------+
only showing top 5 rows



In [59]:
# Write the DataFrame to Hive, partitioned by "month" column
df.write \
    .format("parquet") \
    .mode("overwrite") \
    .partitionBy("month") \
    .saveAsTable("mydb.weather_data_partitioned_by_month")

print("Data successfully partitioned and saved to Hive!")

                                                                                

Data successfully partitioned and saved to Hive!


2024-12-02 22:51:41,623 WARN spark.HeartbeatReceiver: Removing executor driver with no recent heartbeats: 130999 ms exceeds timeout 120000 ms
2024-12-02 22:51:45,465 WARN spark.SparkContext: Killing executors is not supported by current scheduler.
2024-12-06 19:23:31,126 ERROR bonecp.BoneCP: Failed to acquire connection to jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true&useSSL=false. Sleeping for 7000 ms. Attempts left: 5
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet successfully received from the server was 1,331,848 milliseconds ago.  The last packet sent successfully to the server was 1,318,581 milliseconds ago.
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Construc