In [1]:
from pyspark.sql import SparkSession

# Initialize a SparkSession
spark = SparkSession.builder.appName("wine_app").getOrCreate()

# Read the CSV file into a DataFrame, specifying the separator
wine_x1 = spark.read.csv("/home/centos/Downloads/wine.csv", header=True, inferSchema=True, sep=";")

# Show the first few rows of the DataFrame
print(wine_x1.head())

Row(fixed acidity=7.4, volatile acidity=0.7, citric acid=0.0, residual sugar=1.9, chlorides=0.076, free sulfur dioxide=11.0, total sulfur dioxide=34.0, density=0.9978, pH=3.51, sulphates=0.56, alcohol=9.4, quality=5)


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col

spark = SparkSession.builder.appName("Wine Data Analysis").getOrCreate()

# a. Printout the Names of Columns
print("Column Names: ", wine_x1.columns)

Column Names:  ['fixed acidity', 'volatile acidity', 'citric acid', 'residual sugar', 'chlorides', 'free sulfur dioxide', 'total sulfur dioxide', 'density', 'pH', 'sulphates', 'alcohol', 'quality']


In [3]:
# b. Printout the Types of Each Column
print("Column Types: ", wine_x1.printSchema())

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)

Column Types:  None


In [4]:
# c. Printout the Basic Statistics (Mean, Median, Quartiles)
wine_x1.describe().show()

+-------+------------------+-------------------+-------------------+------------------+--------------------+-------------------+--------------------+--------------------+-------------------+------------------+------------------+------------------+
|summary|     fixed acidity|   volatile acidity|        citric acid|    residual sugar|           chlorides|free sulfur dioxide|total sulfur dioxide|             density|                 pH|         sulphates|           alcohol|           quality|
+-------+------------------+-------------------+-------------------+------------------+--------------------+-------------------+--------------------+--------------------+-------------------+------------------+------------------+------------------+
|  count|              1599|               1599|               1599|              1599|                1599|               1599|                1599|                1599|               1599|              1599|              1599|              1599|
|   mean

In [5]:
# d. Printout the Minimum and Maximum Value for Each Column
for column in wine_x1.columns:
    wine_x1.select(column).summary("min", "max").show()

+-------+-------------+
|summary|fixed acidity|
+-------+-------------+
|    min|          4.6|
|    max|         15.9|
+-------+-------------+

+-------+----------------+
|summary|volatile acidity|
+-------+----------------+
|    min|            0.12|
|    max|            1.58|
+-------+----------------+

+-------+-----------+
|summary|citric acid|
+-------+-----------+
|    min|        0.0|
|    max|        1.0|
+-------+-----------+

+-------+--------------+
|summary|residual sugar|
+-------+--------------+
|    min|           0.9|
|    max|          15.5|
+-------+--------------+

+-------+---------+
|summary|chlorides|
+-------+---------+
|    min|    0.012|
|    max|    0.611|
+-------+---------+

+-------+-------------------+
|summary|free sulfur dioxide|
+-------+-------------------+
|    min|                1.0|
|    max|               72.0|
+-------+-------------------+

+-------+--------------------+
|summary|total sulfur dioxide|
+-------+--------------------+
|    min|    

In [6]:
# e. Generate and Printout a Table Showing the Number of Missing Values for Each Column
wine_x1.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in wine_x1.columns]).show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+---+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density| pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+---+---------+-------+-------+
|            0|               0|          0|             0|        0|                  0|                   0|      0|  0|        0|      0|      0|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+---+---------+-------+-------+



In [7]:
# Show all the distinct values in the "quality" column
distinct_qualities = wine_x1.select("quality").distinct()
distinct_qualities.show()

+-------+
|quality|
+-------+
|      6|
|      3|
|      5|
|      4|
|      8|
|      7|
+-------+



In [8]:
from pyspark.sql import functions as F

# Group by 'quality' and calculate the mean for each chemical composition
mean_compositions = wine_x1.groupBy("quality").agg(
    F.mean('fixed acidity').alias('mean_fixed_acidity'),
    F.mean('volatile acidity').alias('mean_volatile_acidity'),
    F.mean('citric acid').alias('mean_citric_acid'),
    F.mean('residual sugar').alias('mean_residual_sugar'),
    F.mean('chlorides').alias('mean_chlorides'),
    F.mean('free sulfur dioxide').alias('mean_free_sulfur_dioxide'),
    F.mean('total sulfur dioxide').alias('mean_total_sulfur_dioxide'),
    F.mean('density').alias('mean_density'),
    F.mean('pH').alias('mean_pH'),
    F.mean('sulphates').alias('mean_sulphates'),
    F.mean('alcohol').alias('mean_alcohol')
)

# Show the results
mean_compositions.show()

+-------+------------------+---------------------+-------------------+-------------------+-------------------+------------------------+-------------------------+------------------+------------------+------------------+------------------+
|quality|mean_fixed_acidity|mean_volatile_acidity|   mean_citric_acid|mean_residual_sugar|     mean_chlorides|mean_free_sulfur_dioxide|mean_total_sulfur_dioxide|      mean_density|           mean_pH|    mean_sulphates|      mean_alcohol|
+-------+------------------+---------------------+-------------------+-------------------+-------------------+------------------------+-------------------------+------------------+------------------+------------------+------------------+
|      6| 8.347178683385575|  0.49748432601880965| 0.2738244514106587|  2.477194357366772|0.08495611285266458|      15.711598746081505|        40.86990595611285|0.9966150626959255|3.3180721003134837|0.6753291536050158|10.629519331243463|
|      3|              8.36|   0.884500000000000

In [9]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

# Initialize the SparkSession
spark = SparkSession.builder.appName("Wine Data Analysis").getOrCreate()

# Reload the wine dataset
wine_x = spark.read.csv("/home/centos/Downloads/wine.csv", header=True, inferSchema=True, sep=";")

# Define the VectorAssembler with the desired columns
assembler = VectorAssembler(inputCols=["citric acid", "volatile acidity", "chlorides", "sulphates"], outputCol="feature_x")

# Transform the dataset to add the 'feature_x' column
wine_x = assembler.transform(wine_x)

# Repartition the DataFrame into 3 RDD partitions
wine_x = wine_x.coalesce(3)

# Show the resulting DataFrame
wine_x.show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|           feature_x|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|[0.0,0.7,0.076,0.56]|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|[0.0,0.88,0.098,0...|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|    

In [10]:
# Cache the DataFrame
wine_x.cache()

# To verify
wine_x.count()

1599

In [11]:
from pyspark.ml.clustering import KMeans

# Define the KMeans estimator
kmeans = KMeans().setK(6).setSeed(1).setFeaturesCol("feature_x")

# Fit the model to the data
model = kmeans.fit(wine_x)

In [12]:
# Transform the DataFrame to include cluster predictions
transformed = model.transform(wine_x)

# Count the number of instances in each cluster
cluster_sizes = transformed.groupBy('prediction').count().orderBy('prediction')
print("Cluster Sizes for 6 Clusters: ")
cluster_sizes.show()

# Access and print the cluster centroids
centroids = model.clusterCenters()
print("Cluster Sizes for 6 Clusters: ")
for i, center in enumerate(centroids):
    print(f"Cluster {i}: {center}")

Cluster Sizes for 6 Clusters: 
+----------+-----+
|prediction|count|
+----------+-----+
|         0|  400|
|         1|  252|
|         2|    1|
|         3|  605|
|         4|   49|
|         5|  292|
+----------+-----+

Cluster Sizes for 6 Clusters: 
Cluster 0: [0.434875 0.42475  0.08378  0.5968  ]
Cluster 1: [0.10079365 0.81123016 0.08349603 0.55968254]
Cluster 2: [1.   0.52 0.61 2.  ]
Cluster 3: [0.1308595  0.56433884 0.08129587 0.61791736]
Cluster 4: [0.44795918 0.51755102 0.21959184 1.24142857]
Cluster 5: [0.45143836 0.3505137  0.08476712 0.80804795]


In [13]:
# Define the KMeans estimator with 4 clusters
kmeans_4 = KMeans().setK(4).setSeed(1).setFeaturesCol("feature_x")

# Fit the model to the data
model_4 = kmeans_4.fit(wine_x)

# Transform the DataFrame to include cluster predictions
transformed_4 = model_4.transform(wine_x)

# Count the number of instances in each cluster
cluster_sizes_4 = transformed_4.groupBy('prediction').count().orderBy('prediction')
print("Cluster Sizes for 4 Clusters:")
cluster_sizes_4.show()

# Access and print the cluster centroids
centroids_4 = model_4.clusterCenters()
print("Cluster Centroids for 4 Clusters:")
for i, center in enumerate(centroids_4):
    print(f"Cluster {i}: {center}")

Cluster Sizes for 4 Clusters:
+----------+-----+
|prediction|count|
+----------+-----+
|         0|  619|
|         1|   55|
|         2|  470|
|         3|  455|
+----------+-----+

Cluster Centroids for 4 Clusters:
Cluster 0: [0.08555732 0.68045382 0.08275637 0.59882166]
Cluster 1: [0.46037037 0.51185185 0.21912963 1.2387037 ]
Cluster 2: [0.2926569  0.48105649 0.08201255 0.60267782]
Cluster 3: [0.48931663 0.36235763 0.08394761 0.73200456]
