In [8]:
from pyspark.sql import SparkSession

# Initialize Spark session with Delta Lake
spark = SparkSession.builder \
    .appName("LocalWineQualityAnalysis") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Load the dataset
df = spark.read.csv("../winequality-red.csv", header=True, sep=";")

# Show the first 5 rows
df.show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|          0|           1.9|    0.076|                 11|                  34| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|          0|           2.6|    0.098|                 25|                  67| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|    0.092|                 15|                  54|  0.997|3.26|     0.65|    9.8|      5|
|         11.2|            0.28|       0.56|           1.9|    0.075|                 17|           

In [9]:
# Rename columns to replace spaces with underscores
new_column_names = [col.replace(" ", "_") for col in df.columns]
for old_name, new_name in zip(df.columns, new_column_names):
    df = df.withColumnRenamed(old_name, new_name)

# Verify the new column names
df.show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed_acidity|volatile_acidity|citric_acid|residual_sugar|chlorides|free_sulfur_dioxide|total_sulfur_dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|          0|           1.9|    0.076|                 11|                  34| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|          0|           2.6|    0.098|                 25|                  67| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|    0.092|                 15|                  54|  0.997|3.26|     0.65|    9.8|      5|
|         11.2|            0.28|       0.56|           1.9|    0.075|                 17|           

In [10]:
# Filter for high-quality wines (quality >= 7)
high_quality_df = df.filter(df.quality >= 7)

# Calculate average alcohol content
avg_alcohol_high_quality = high_quality_df.agg({"alcohol": "avg"}).collect()[0][0]
print(f"Average alcohol content for wines with quality >= 7: {avg_alcohol_high_quality}")

# Show 5 rows
high_quality_df.show(5)

Average alcohol content for wines with quality >= 7: 11.518049155145922
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed_acidity|volatile_acidity|citric_acid|residual_sugar|chlorides|free_sulfur_dioxide|total_sulfur_dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.3|            0.65|          0|           1.2|    0.065|                 15|                  21| 0.9946|3.39|     0.47|     10|      7|
|          7.8|            0.58|       0.02|             2|    0.073|                  9|                  18| 0.9968|3.36|     0.57|    9.5|      7|
|          8.5|            0.28|       0.56|           1.8|    0.092|                 35|                 103| 0.9969| 3.3|     0.75|   10.5|      7|
|          8.1|            0

In [12]:
from pyspark.sql.types import FloatType, IntegerType

# Cast columns to appropriate types
numeric_cols = ["fixed_acidity", "volatile_acidity", "citric_acid", "residual_sugar", 
                "chlorides", "free_sulfur_dioxide", "total_sulfur_dioxide", "density", 
                "pH", "sulphates", "alcohol"]
for col in numeric_cols:
    df = df.withColumn(col, df[col].cast(FloatType()))
df = df.withColumn("quality", df["quality"].cast(IntegerType()))

# Verify schema
df.printSchema()

# Save as Delta table
df.write.format("delta").mode("overwrite").save("../delta/winequality-red")

# Load and verify
df_delta = spark.read.format("delta").load("../delta/winequality-red")
df_delta.show(5)

root
 |-- fixed_acidity: float (nullable = true)
 |-- volatile_acidity: float (nullable = true)
 |-- citric_acid: float (nullable = true)
 |-- residual_sugar: float (nullable = true)
 |-- chlorides: float (nullable = true)
 |-- free_sulfur_dioxide: float (nullable = true)
 |-- total_sulfur_dioxide: float (nullable = true)
 |-- density: float (nullable = true)
 |-- pH: float (nullable = true)
 |-- sulphates: float (nullable = true)
 |-- alcohol: float (nullable = true)
 |-- quality: integer (nullable = true)



25/05/19 20:00:37 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed_acidity|volatile_acidity|citric_acid|residual_sugar|chlorides|free_sulfur_dioxide|total_sulfur_dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|
|         11.2|            0.28|       0.56|           1.9|    0.075|               17.0|           

In [13]:
# Register the Delta table as a temporary view for SQL queries
df_delta.createOrReplaceTempView("winequality")

# Run a SQL query: Average alcohol by quality
result = spark.sql("""
    SELECT quality, AVG(alcohol) as avg_alcohol
    FROM winequality
    GROUP BY quality
    ORDER BY quality
""")
result.show()

+-------+------------------+
|quality|       avg_alcohol|
+-------+------------------+
|      3| 9.954999828338623|
|      4|10.265094343221412|
|      5| 9.899706287356025|
|      6|10.629519308622355|
|      7|11.465912914755357|
|      8|12.094444433848063|
+-------+------------------+



In [14]:
# Calculate overall average alcohol
overall_avg_alcohol = df_delta.agg({"alcohol": "avg"}).collect()[0][0]
print(f"Overall average alcohol: {overall_avg_alcohol}")

# Register the view (already done, but ensure it's active)
df_delta.createOrReplaceTempView("winequality")

# SQL query: Wines with alcohol above overall average, grouped by quality
result = spark.sql(f"""
    SELECT quality, COUNT(*) as count_above_avg
    FROM winequality
    WHERE alcohol > {overall_avg_alcohol}
    GROUP BY quality
    ORDER BY quality
""")
result.show()

Overall average alcohol: 10.422983095003262
+-------+---------------+
|quality|count_above_avg|
+-------+---------------+
|      3|              3|
|      4|             20|
|      5|            137|
|      6|            335|
|      7|            172|
|      8|             16|
+-------+---------------+



In [15]:
from pyspark.sql.functions import col

# Extract: High-quality wines (quality >= 7)
high_quality_df = df_delta.filter(col("quality") >= 7)

# Transform: Normalize alcohol
min_alcohol = high_quality_df.agg({"alcohol": "min"}).collect()[0][0]
max_alcohol = high_quality_df.agg({"alcohol": "max"}).collect()[0][0]
high_quality_df = high_quality_df.withColumn("alcohol_normalized", 
                                            (col("alcohol") - min_alcohol) / (max_alcohol - min_alcohol))

# Load: Save as a new Delta table
high_quality_df.write.format("delta").mode("overwrite").save("../delta/high_quality_wines")

# Verify the new table
new_df = spark.read.format("delta").load("../delta/high_quality_wines")
new_df.show(5)

                                                                                

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+-------------------+
|fixed_acidity|volatile_acidity|citric_acid|residual_sugar|chlorides|free_sulfur_dioxide|total_sulfur_dioxide|density|  pH|sulphates|alcohol|quality| alcohol_normalized|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+-------------------+
|          7.3|            0.65|        0.0|           1.2|    0.065|               15.0|                21.0| 0.9946|3.39|     0.47|   10.0|      7| 0.1666666997803569|
|          7.8|            0.58|       0.02|           2.0|    0.073|                9.0|                18.0| 0.9968|3.36|     0.57|    9.5|      7| 0.0625000372529015|
|          8.5|            0.28|       0.56|           1.8|    0.092|               35.0|               103.0| 0.9969| 3.3|     0.75|   10.5|      7| 