DU

2.1 Collect initial data

import spark package for data mining and read dataset

In [None]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, trim, lower, split, count, avg, format_number,isnan,sum as _sum
from functools import reduce
from pyspark.sql.types import (StructField, StringType, IntegerType, StructType, FloatType, DoubleType)
from pyspark.ml.feature import StringIndexer, OneHotEncoder, PCA, MinMaxScaler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

spark = SparkSession.builder \
    .appName('iteraction_4')\
    .config("spark.executor.heartbeatInterval", "120s") \
    .config("spark.network.timeout", "300s") \
    .getOrCreate()

# read CSV document
df1 = spark.read.csv('weather.csv', header=True, inferSchema=True)
df2 = spark.read.csv('fires.csv', header=True, inferSchema=True)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


2.2 Explore the data

In [None]:
# print DataFrame
print("Weather DataFrame:")
df1.show()
df1.printSchema()
# print the shape of DataFrame
print("Shape df1: ({}, {})".format(df1.count(), len(df1.columns)))
# print the datatype of DataFrame
print("df1 dtype:", df1.dtypes)

print("Fires DataFrame:")
df2.show()
df2.printSchema()
print("Shape df2: ({}, {})".format(df2.count(), len(df2.columns)))
print("df2 dtype:", df2.dtypes)


2.3 Explore the data

Find missing data,count missing rows by country and identify rows containing at least one missing value

In [None]:
# check missing value
df1.select([(col(c).isNull().cast("int")).alias(c) for c in df1.columns]).agg(*[ _sum(c).alias(c) for c in df1.columns ]).show()
df2.select([(col(c).isNull().cast("int")).alias(c) for c in df2.columns]).agg(*[ _sum(c).alias(c) for c in df2.columns ]).show()

# integret DataFrame 1 and DataFrame 2
df3 = df1.join(df2, on=['X', 'Y', 'num', 'country'], how='outer')

# print datatype of integreted DataFrame(df3)
print("df3 dtype:", df3.dtypes)

# count the final rows per country
total_rows_per_country = df3.groupBy("country").agg(count("*").alias("total_count"))
condition = reduce(lambda x, y: x | y, [col(c).isNull() for c in df3.columns])
missing_rows_per_country = df3.filter(condition).groupBy("country").agg(count("*").alias("missing_count"))

# count missing rate
missing_ratio_per_country = missing_rows_per_country.join(
    total_rows_per_country,
    on="country",
    how="inner"
).withColumn(
    "missing_ratio",
    col("missing_count") / col("total_count")
).select(
    "country",
    "missing_ratio"
).orderBy(col("missing_ratio").desc())

# show result
missing_ratio_per_country.show()

2.4 Verify the data quality

Visualise the rate of missing data

In [None]:
# clean the data, and format the values of"month" and "day"
df3 = df3.withColumn("month", 
    when(col("month") == 'jan', 1).when(col("month") == 'feb', 2).when(col("month") == 'mar', 3)
    .when(col("month") == 'apr', 4).when(col("month") == 'may', 5).when(col("month") == 'jun', 6)
    .when(col("month") == 'jul', 7).when(col("month") == 'aug', 8).when(col("month") == 'sep', 9)
    .when(col("month") == 'oct', 10).when(col("month") == 'nov', 11).when(col("month") == 'dec', 12)
    .otherwise(col("month"))
)

df3 = df3.withColumn("day", 
    when(col("day") == 'mon', 1).when(col("day") == 'tue', 2).when(col("day") == 'wed', 3)
    .when(col("day") == 'thu', 4).when(col("day") == 'fri', 5).when(col("day") == 'sat', 6)
    .when(col("day") == 'sun', 7).otherwise(col("day"))
)
df3 = df3.withColumn("country", 
    when(col("country") == 'Portgual', 1).when(col("country") == 'Brazil', 2).otherwise(col("country"))
)

# 计算每个月的记录数
monthly_counts = df3.groupBy("month").agg(count("area").alias("area"))

# use median values fill the missing values
numeric_columns = [field.name for field in df3.schema.fields if isinstance(field.dataType, (DoubleType, IntegerType, FloatType))]
median_values = df3.approxQuantile(numeric_columns, [0.5], 0.25)
median_dict = {col: median_values[i] for i, col in enumerate(numeric_columns)}

median_dict = {col: median_values[0][i] for i, col in enumerate(numeric_columns) if i < len(median_values[0])}

df4 = df3.fillna(median_dict)

df4.show()
print("Statistical Description:")
df4.describe().show()

# get the shape of DataFrame的形状
shape = (df4.count(), len(df4.columns))
print("Shape:", shape)

# count the number of missing value
df4.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df4.columns]).show()

# 验证数据类型
print("Data Types:", df4.dtypes)

# 计算相关矩阵
vector_col = "corr_features"
assembler = VectorAssembler(inputCols=df4.columns, outputCol=vector_col)
df_vector = assembler.transform(df4).select(vector_col)
matrix = Correlation.corr(df_vector, vector_col).collect()[0][0]
corr_matrix = matrix.toArray()

print("Correlation Matrix:", corr_matrix)

# 将相关矩阵转换为DataFrame并显示
corr_df = spark.createDataFrame(
    corr_matrix.tolist(),
    df4.columns
)

corr_df.show()

In [None]:
# 将结果转换为pandas DataFrame以进行可视化
monthly_counts_pd = monthly_counts.toPandas()
df3_pd = df3.toPandas()

import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when
from functools import reduce

# 启用内联绘图模式
%matplotlib inline

# 设置绘图的默认参数
plt.rcParams['figure.figsize'] = [8, 8]

# Records
plt.figure(figsize=(10, 6))
plt.bar(monthly_counts_pd['month'], monthly_counts_pd['area'], color='skyblue')
plt.xlabel('Month')
plt.ylabel('Number of Records')
plt.title('Relationship Between the Number of Area Records and Month')
plt.show()

# Relationship
plt.figure(figsize=(10, 6))
plt.bar(df3_pd['month'], df3_pd['area'])
plt.xlabel('Month')
plt.ylabel('Area')
plt.title('Relationship between area and month')
plt.show()

# Distribution Plot
sns.distplot(df3_pd['area'])
plt.title('Distribution of Area')
plt.show()
