In [9]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# 初始化SparkSession
from pyspark.sql.types import IntegerType, FloatType, DoubleType
from webencodings import labels
spark = SparkSession.builder.appName("PesticidesAnalysis").getOrCreate()

pesticides_path = "./data/pesticides.csv"
pesticides = spark.read.csv(pesticides_path, header=True, inferSchema=True)

rainfall_path = "./data/rainfall.csv"
rainfall = spark.read.csv(rainfall_path, header=True, inferSchema=True)

temp_path = "./data/temp.csv"
temp = spark.read.csv(temp_path, header=True, inferSchema=True)

yield_path = "./data/yield.csv"
yields = spark.read.csv(yield_path, header=True, inferSchema=True)


# 选择特定的列
selected_pesticides = pesticides.select('Area', 'Year', 'Value')

# 显示DataFrame的前几行
selected_pesticides.show(5)


+-------+----+-----+
|   Area|Year|Value|
+-------+----+-----+
|Albania|1990|121.0|
|Albania|1991|121.0|
|Albania|1992|121.0|
|Albania|1993|121.0|
|Albania|1994|201.0|
+-------+----+-----+
only showing top 5 rows



In [10]:

# 选择特定的列
selected_rainfall = rainfall.select(' Area', 'Year', 'average_rain_fall_mm_per_year')

# 显示DataFrame的前几行
selected_rainfall.show(5)# 选择特定的列



+-----------+----+-----------------------------+
|       Area|Year|average_rain_fall_mm_per_year|
+-----------+----+-----------------------------+
|Afghanistan|1985|                          327|
|Afghanistan|1986|                          327|
|Afghanistan|1987|                          327|
|Afghanistan|1989|                          327|
|Afghanistan|1990|                          327|
+-----------+----+-----------------------------+
only showing top 5 rows



In [11]:

selected_temp = temp.select('year', 'country', 'avg_temp')

# 显示DataFrame的前几行
selected_temp.show(5)# 选择特定的列


+----+-------------+--------+
|year|      country|avg_temp|
+----+-------------+--------+
|1849|Côte D'Ivoire|   25.58|
|1850|Côte D'Ivoire|   25.52|
|1851|Côte D'Ivoire|   25.67|
|1852|Côte D'Ivoire|    null|
|1853|Côte D'Ivoire|    null|
+----+-------------+--------+
only showing top 5 rows



In [14]:

selected_yields = yields.select('Area', 'Item', 'Year', 'Value')

# 显示DataFrame的前几行
selected_yields.show(5)

+-----------+-----+----+-----+
|       Area| Item|Year|Value|
+-----------+-----+----+-----+
|Afghanistan|Maize|1961|14000|
|Afghanistan|Maize|1962|14000|
|Afghanistan|Maize|1963|14260|
|Afghanistan|Maize|1964|14257|
|Afghanistan|Maize|1965|14400|
+-----------+-----+----+-----+
only showing top 5 rows



In [15]:
def clean_data(df):
    # 1. 删除空值
    df = df.dropna()

    # 2. 使用IQR去除离群值
    for column in [col_name for col_name, dtype in df.dtypes if dtype in ['double', 'int']]:  # 只考虑数值型列
        # 计算四分位数和IQR
        quantiles = df.approxQuantile(column, [0.25, 0.75], 0.05)
        Q1, Q3 = quantiles[0], quantiles[1]
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR

        # 过滤离群值
        df = df.filter((col(column) >= lower_bound) & (col(column) <= upper_bound))

    return df


# 调用函数
print(selected_pesticides.show(5))
cleaned_pesticides = clean_data(selected_pesticides)

# 打印结果查看
print(cleaned_pesticides.show(5))


# 调用函数
print(selected_temp.show(5))
cleaned_temp = clean_data(selected_temp)

# 打印结果查看
print(cleaned_temp.show(5))



# 调用函数
print(selected_yields.show(5))
cleaned_yields = clean_data(selected_yields)

# 打印结果查看
print(cleaned_yields.show(5))



# 调用函数
print(selected_rainfall.show(5))
cleaned_rainfall = clean_data(selected_rainfall)

# 打印结果查看
print(cleaned_rainfall.show(5))

+-------+----+-----+
|   Area|Year|Value|
+-------+----+-----+
|Albania|1990|121.0|
|Albania|1991|121.0|
|Albania|1992|121.0|
|Albania|1993|121.0|
|Albania|1994|201.0|
+-------+----+-----+
only showing top 5 rows

None
+-------+----+-----+
|   Area|Year|Value|
+-------+----+-----+
|Albania|1990|121.0|
|Albania|1991|121.0|
|Albania|1992|121.0|
|Albania|1993|121.0|
|Albania|1994|201.0|
+-------+----+-----+
only showing top 5 rows

None
+----+-------------+--------+
|year|      country|avg_temp|
+----+-------------+--------+
|1849|Côte D'Ivoire|   25.58|
|1850|Côte D'Ivoire|   25.52|
|1851|Côte D'Ivoire|   25.67|
|1852|Côte D'Ivoire|    null|
|1853|Côte D'Ivoire|    null|
+----+-------------+--------+
only showing top 5 rows

None
+----+-------------+--------+
|year|      country|avg_temp|
+----+-------------+--------+
|1849|Côte D'Ivoire|   25.58|
|1850|Côte D'Ivoire|   25.52|
|1851|Côte D'Ivoire|   25.67|
|1856|Côte D'Ivoire|   26.28|
|1857|Côte D'Ivoire|   25.17|
+----+-------------+--

In [16]:

def merge_datasets(pesticides, temp, yields, rainfall):
    pesticides = pesticides.withColumnRenamed('Value', 'pesticides_tonnes')
    temp = temp.withColumnRenamed('country', 'Area').withColumnRenamed('year', 'Year')
    yields = yields.withColumnRenamed('Value', 'hg_per_ha_yield')
    rainfall = rainfall.withColumnRenamed(' Area', 'Area').withColumnRenamed('average_rain_fall_mm_per_year', 'avg_annual_rainfall_mm')

    merged = pesticides.join(temp, on=['Area', 'Year'], how='outer')
    merged = merged.join(yields, on=['Area', 'Year'], how='outer')
    merged = merged.join(rainfall, on=['Area', 'Year'], how='outer')

    return merged

# 调用函数合并数据集
merged_data = merge_datasets(cleaned_pesticides, cleaned_temp, cleaned_yields, cleaned_rainfall)

# 显示合并后的数据
print(merged_data.show(5))

[Stage 221:>                                                        (0 + 2) / 2]

+-----------+----+-----------------+--------+----+---------------+----------------------+
|       Area|Year|pesticides_tonnes|avg_temp|Item|hg_per_ha_yield|avg_annual_rainfall_mm|
+-----------+----+-----------------+--------+----+---------------+----------------------+
|Afghanistan|1833|             null|   13.91|null|           null|                  null|
|Afghanistan|1834|             null|   13.91|null|           null|                  null|
|Afghanistan|1837|             null|   15.47|null|           null|                  null|
|Afghanistan|1838|             null|    18.5|null|           null|                  null|
|Afghanistan|1841|             null|   14.17|null|           null|                  null|
+-----------+----+-----------------+--------+----+---------------+----------------------+
only showing top 5 rows

None


                                                                                

In [17]:

yield_df_path = "./data/yield_df.csv"
yield_df = spark.read.csv(yield_df_path, header=True, inferSchema=True)
describes(yield_df)

root
 |-- _c0: integer (nullable = true)
 |-- Area: string (nullable = true)
 |-- Item: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- hg/ha_yield: integer (nullable = true)
 |-- average_rain_fall_mm_per_year: double (nullable = true)
 |-- pesticides_tonnes: double (nullable = true)
 |-- avg_temp: double (nullable = true)

None


24/05/19 12:08:34 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , Area, Item, Year, hg/ha_yield, average_rain_fall_mm_per_year, pesticides_tonnes, avg_temp
 Schema: _c0, Area, Item, Year, hg/ha_yield, average_rain_fall_mm_per_year, pesticides_tonnes, avg_temp
Expected: _c0 but found: 
CSV file: file:///home/ubuntu/722A4/data/yield_df.csv
                                                                                

+-------+----------------+--------+-------+------------------+-----------------+-----------------------------+------------------+------------------+
|summary|             _c0|    Area|   Item|              Year|      hg/ha_yield|average_rain_fall_mm_per_year| pesticides_tonnes|          avg_temp|
+-------+----------------+--------+-------+------------------+-----------------+-----------------------------+------------------+------------------+
|  count|           28242|   28242|  28242|             28242|            28242|                        28242|             28242|             28242|
|   mean|         14120.5|    null|   null|2001.5442957297641|77053.33209404434|            1149.055980454642|37076.909343529136|20.542626584519553|
| stddev|8152.90748751634|    null|   null|7.0519052853951205|84956.61289666739|            709.8121499492227| 59958.78466505776| 6.312050836049751|
|    min|               0| Albania|Cassava|              1990|               50|                         5

24/05/19 12:08:35 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , Area, Item, Year, hg/ha_yield, average_rain_fall_mm_per_year, pesticides_tonnes, avg_temp
 Schema: _c0, Area, Item, Year, hg/ha_yield, average_rain_fall_mm_per_year, pesticides_tonnes, avg_temp
Expected: _c0 but found: 
CSV file: file:///home/ubuntu/722A4/data/yield_df.csv
