MODERN DATA ANALYTIC

In [28]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import time

Import the required libraries, including PySpark.

In [29]:
start_time = time.time()

spark = SparkSession.builder.appName("AirQualityAnalytics").getOrCreate()
data = spark.read.csv('PRSA_data_2010.1.1-2014.12.31.csv', header=True, inferSchema=True)

end_time = time.time()
elapsed_time_loading = end_time - start_time
print(f"Time taken to load data: {elapsed_time_loading} seconds")

Time taken to load data: 0.30864715576171875 seconds


Create a Spark session to enable the usage of Spark functionalities.
Read the CSV file into a Spark DataFrame.
Record the time elapsed to load data

In [30]:
start_time = time.time()
data.selectExpr("min(`pm2.5`)", "max(`pm2.5`)").show()
data_cleaned = data.filter(col("`pm2.5`") != 'NA')
data_cleaned.show()
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken for operations: {elapsed_time} seconds")

+----------+----------+
|min(pm2.5)|max(pm2.5)|
+----------+----------+
|         0|        NA|
+----------+----------+

+---+----+-----+---+----+-----+----+----+------+----+-----+---+---+
| No|year|month|day|hour|pm2.5|DEWP|TEMP|  PRES|cbwd|  Iws| Is| Ir|
+---+----+-----+---+----+-----+----+----+------+----+-----+---+---+
| 25|2010|    1|  2|   0|  129| -16|-4.0|1020.0|  SE| 1.79|  0|  0|
| 26|2010|    1|  2|   1|  148| -15|-4.0|1020.0|  SE| 2.68|  0|  0|
| 27|2010|    1|  2|   2|  159| -11|-5.0|1021.0|  SE| 3.57|  0|  0|
| 28|2010|    1|  2|   3|  181|  -7|-5.0|1022.0|  SE| 5.36|  1|  0|
| 29|2010|    1|  2|   4|  138|  -7|-5.0|1022.0|  SE| 6.25|  2|  0|
| 30|2010|    1|  2|   5|  109|  -7|-6.0|1022.0|  SE| 7.14|  3|  0|
| 31|2010|    1|  2|   6|  105|  -7|-6.0|1023.0|  SE| 8.93|  4|  0|
| 32|2010|    1|  2|   7|  124|  -7|-5.0|1024.0|  SE|10.72|  0|  0|
| 33|2010|    1|  2|   8|  120|  -8|-6.0|1024.0|  SE|12.51|  0|  0|
| 34|2010|    1|  2|   9|  132|  -7|-5.0|1025.0|  SE| 14.3|  0|

Check the data range of pm2.5. Rows with 'NA' values in pm2.5 are cleaned with deletion methods. The resulting DataFrame data_cleaned will not contain those rows.

In [31]:
start_time = time.time()
from pyspark.sql.functions import concat, lit, col, lpad
data = data_cleaned.withColumn(
    "timestamp",
    concat(
        col("`year`"), lit("-"),
        lpad(col("`month`"), 2, "0"), lit("-"),
        lpad(col("`day`"), 2, "0"), lit(" "),
        lpad(col("`hour`"), 2, "0"), lit(":00:00:000")
    )
)
data.show()
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken for timestamp operation: {elapsed_time} seconds")

+---+----+-----+---+----+-----+----+----+------+----+-----+---+---+--------------------+
| No|year|month|day|hour|pm2.5|DEWP|TEMP|  PRES|cbwd|  Iws| Is| Ir|           timestamp|
+---+----+-----+---+----+-----+----+----+------+----+-----+---+---+--------------------+
| 25|2010|    1|  2|   0|  129| -16|-4.0|1020.0|  SE| 1.79|  0|  0|2010-01-02 00:00:...|
| 26|2010|    1|  2|   1|  148| -15|-4.0|1020.0|  SE| 2.68|  0|  0|2010-01-02 01:00:...|
| 27|2010|    1|  2|   2|  159| -11|-5.0|1021.0|  SE| 3.57|  0|  0|2010-01-02 02:00:...|
| 28|2010|    1|  2|   3|  181|  -7|-5.0|1022.0|  SE| 5.36|  1|  0|2010-01-02 03:00:...|
| 29|2010|    1|  2|   4|  138|  -7|-5.0|1022.0|  SE| 6.25|  2|  0|2010-01-02 04:00:...|
| 30|2010|    1|  2|   5|  109|  -7|-6.0|1022.0|  SE| 7.14|  3|  0|2010-01-02 05:00:...|
| 31|2010|    1|  2|   6|  105|  -7|-6.0|1023.0|  SE| 8.93|  4|  0|2010-01-02 06:00:...|
| 32|2010|    1|  2|   7|  124|  -7|-5.0|1024.0|  SE|10.72|  0|  0|2010-01-02 07:00:...|
| 33|2010|    1|  2| 

In [32]:
from pyspark.sql.functions import avg
from pyspark.sql.types import FloatType

start_time = time.time()
data = data.withColumn("`pm2.5`", col("`pm2.5`").cast(FloatType()))
aggregated_data = data.groupBy("`timestamp`").agg(avg("`pm2.5`"))
aggregated_data.show()
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken for aggregation operation: {elapsed_time} seconds")


+--------------------+------------+
|           timestamp|avg(`pm2.5`)|
+--------------------+------------+
|2010-01-15 01:00:...|        37.0|
|2010-02-03 09:00:...|       120.0|
|2010-02-03 14:00:...|        17.0|
|2010-02-13 08:00:...|        24.0|
|2010-02-27 00:00:...|        38.0|
|2010-03-14 10:00:...|        90.0|
|2010-03-17 04:00:...|        41.0|
|2010-03-26 06:00:...|        57.0|
|2010-04-10 20:00:...|        56.0|
|2010-04-17 19:00:...|       134.0|
|2010-04-24 13:00:...|       111.0|
|2010-05-03 22:00:...|       150.0|
|2010-05-04 15:00:...|       104.0|
|2010-05-05 02:00:...|       158.0|
|2010-05-10 21:00:...|        20.0|
|2010-05-18 16:00:...|        54.0|
|2010-05-20 11:00:...|        53.0|
|2010-05-25 08:00:...|        29.0|
|2010-06-15 07:00:...|       196.0|
|2010-06-21 06:00:...|        48.0|
+--------------------+------------+
only showing top 20 rows

Time taken for aggregation operation: 0.48117852210998535 seconds


In [33]:
# Start measuring time for analytics on entire dataset
start_time_full_analytics = time.time()
max_pm25_full = data.agg({"`pm2.5`": "max"}).collect()[0][0]
end_time_full_analytics = time.time()
elapsed_time_full_analytics = end_time_full_analytics - start_time_full_analytics

print(f"\nMaximum PM2.5 value on entire dataset: {max_pm25_full}")
print(f"Time taken for analytics on entire dataset: {elapsed_time_full_analytics} seconds")


Maximum PM2.5 value on entire dataset: 994
Time taken for analytics on entire dataset: 0.23000597953796387 seconds


In [34]:
# Subset of data for demonstration
subset_data = data.sample(fraction=0.1, seed=42)
start_time_subset_analytics = time.time()
max_pm25_subset = subset_data.agg({"`pm2.5`": "max"}).collect()[0][0]
end_time_subset_analytics = time.time()
elapsed_time_subset_analytics = end_time_subset_analytics - start_time_subset_analytics

print(f"Maximum PM2.5 value on subset: {max_pm25_subset}")
print(f"Time taken for analytics on subset: {elapsed_time_subset_analytics} seconds")

Maximum PM2.5 value on subset: 99
Time taken for analytics on subset: 0.22312188148498535 seconds


CONVENTIONAL DATA ANALYSIS

In [35]:
import pandas as pd
import time
start_time = time.time()
data = pd.read_csv('PRSA_data_2010.1.1-2014.12.31.csv')
end_time = time.time()
elapsed_time_loading = end_time - start_time
print(f"Time taken to load data: {elapsed_time_loading} seconds")

Time taken to load data: 0.1011967658996582 seconds


In [36]:
start_time = time.time()

# Calculate min and max of 'pm2.5'
min_max_pm25 = data['pm2.5'].agg(['min', 'max'])
print("Minimum and Maximum PM2.5 values:")
print(min_max_pm25)

# Filter 'NA' values in 'pm2.5'
data_cleaned = data[data['pm2.5'].notnull()]
print("\nCleaned DataFrame (NA values removed):")
print(data_cleaned)

end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken for operations: {elapsed_time} seconds")

Minimum and Maximum PM2.5 values:
min      0.0
max    994.0
Name: pm2.5, dtype: float64

Cleaned DataFrame (NA values removed):
          No  year  month  day  hour  pm2.5  DEWP  TEMP    PRES cbwd     Iws  \
24        25  2010      1    2     0  129.0   -16  -4.0  1020.0   SE    1.79   
25        26  2010      1    2     1  148.0   -15  -4.0  1020.0   SE    2.68   
26        27  2010      1    2     2  159.0   -11  -5.0  1021.0   SE    3.57   
27        28  2010      1    2     3  181.0    -7  -5.0  1022.0   SE    5.36   
28        29  2010      1    2     4  138.0    -7  -5.0  1022.0   SE    6.25   
...      ...   ...    ...  ...   ...    ...   ...   ...     ...  ...     ...   
43819  43820  2014     12   31    19    8.0   -23  -2.0  1034.0   NW  231.97   
43820  43821  2014     12   31    20   10.0   -22  -3.0  1034.0   NW  237.78   
43821  43822  2014     12   31    21   10.0   -22  -3.0  1034.0   NW  242.70   
43822  43823  2014     12   31    22    8.0   -22  -4.0  1034.0   NW  24

In [37]:
start_time = time.time()
data_cleaned['timestamp'] = (
    data_cleaned['year'].astype(str) + '-' +
    data_cleaned['month'].apply(lambda x: str(x).zfill(2)) + '-' +
    data_cleaned['day'].apply(lambda x: str(x).zfill(2)) + ' ' +
    data_cleaned['hour'].apply(lambda x: str(x).zfill(2)) + ':00:00:000'
)
print("DataFrame with timestamp column:")
print(data_cleaned)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken for timestamp operation: {elapsed_time} seconds")

DataFrame with timestamp column:
          No  year  month  day  hour  pm2.5  DEWP  TEMP    PRES cbwd     Iws  \
24        25  2010      1    2     0  129.0   -16  -4.0  1020.0   SE    1.79   
25        26  2010      1    2     1  148.0   -15  -4.0  1020.0   SE    2.68   
26        27  2010      1    2     2  159.0   -11  -5.0  1021.0   SE    3.57   
27        28  2010      1    2     3  181.0    -7  -5.0  1022.0   SE    5.36   
28        29  2010      1    2     4  138.0    -7  -5.0  1022.0   SE    6.25   
...      ...   ...    ...  ...   ...    ...   ...   ...     ...  ...     ...   
43819  43820  2014     12   31    19    8.0   -23  -2.0  1034.0   NW  231.97   
43820  43821  2014     12   31    20   10.0   -22  -3.0  1034.0   NW  237.78   
43821  43822  2014     12   31    21   10.0   -22  -3.0  1034.0   NW  242.70   
43822  43823  2014     12   31    22    8.0   -22  -4.0  1034.0   NW  246.72   
43823  43824  2014     12   31    23   12.0   -21  -3.0  1034.0   NW  249.85   

      

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data_cleaned['timestamp'] = (


In [38]:
start_time = time.time()
data_cleaned['pm2.5'] = pd.to_numeric(data_cleaned['pm2.5'], errors='coerce')
aggregated_data = data_cleaned.groupby('timestamp')['pm2.5'].mean().reset_index()
print("Aggregated DataFrame:")
print(aggregated_data)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken for aggregation operation: {elapsed_time} seconds")

Aggregated DataFrame:
                     timestamp  pm2.5
0      2010-01-02 00:00:00:000  129.0
1      2010-01-02 01:00:00:000  148.0
2      2010-01-02 02:00:00:000  159.0
3      2010-01-02 03:00:00:000  181.0
4      2010-01-02 04:00:00:000  138.0
...                        ...    ...
41752  2014-12-31 19:00:00:000    8.0
41753  2014-12-31 20:00:00:000   10.0
41754  2014-12-31 21:00:00:000   10.0
41755  2014-12-31 22:00:00:000    8.0
41756  2014-12-31 23:00:00:000   12.0

[41757 rows x 2 columns]
Time taken for aggregation operation: 0.023965835571289062 seconds


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data_cleaned['pm2.5'] = pd.to_numeric(data_cleaned['pm2.5'], errors='coerce')


In [39]:
subset_data = data.sample(frac=0.1, random_state=42)

start_time_subset_analytics = time.time()
max_pm25_subset = subset_data['pm2.5'].max()
end_time_subset_analytics = time.time()
elapsed_time_subset_analytics = end_time_subset_analytics - start_time_subset_analytics

print(f"Maximum PM2.5 value on subset: {max_pm25_subset}")
print(f"Time taken for analytics on subset: {elapsed_time_subset_analytics} seconds")

Maximum PM2.5 value on subset: 824.0
Time taken for analytics on subset: 0.00028228759765625 seconds


In [40]:
start_time_full_analytics = time.time()
max_pm25_full = data['pm2.5'].max()
end_time_full_analytics = time.time()
elapsed_time_full_analytics = end_time_full_analytics - start_time_full_analytics

print(f"Maximum PM2.5 value on entire dataset: {max_pm25_full}")
print(f"Time taken for analytics on entire dataset: {elapsed_time_full_analytics} seconds")

Maximum PM2.5 value on entire dataset: 994.0
Time taken for analytics on entire dataset: 0.00044083595275878906 seconds
