In [5]:
# Importing necessary PySpark modules and classes for data handling and ML
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.stat import Correlation
from pyspark.ml import Pipeline


In [6]:
# Create or retrieve a SparkSession
spark = SparkSession.builder \
    .appName("Product Data") \
    .getOrCreate()

25/05/19 01:51:56 WARN Utils: Your hostname, Precision-M6700 resolves to a loopback address: 127.0.1.1; using 192.168.43.246 instead (on interface wlp3s0)
25/05/19 01:51:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/19 01:51:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
# Define the schema for a sample product dataset
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("prix_eur", FloatType(), False),
    StructField("technologie", StringType(), False),
    StructField("score", FloatType(), False),
    StructField("statut", StringType(), False)
])

# Sample data for the products
data = [
    (101, 1499.99, "Laptop", 8.7, "en vente"),
    (102, 199.00, "Drone", 9.1, "epuise"),
    (103, 749.50, "Laptop", 7.9, "en vente"),
    (104, 299.95, "Drone", 8.3, "epuise"),
    (105, 129.99, "Smartwatch", 7.5, "en vente"),
    (106, 999.00, "Laptop", 9.4, "epuise")
]

In [9]:
# Create a DataFrame using the schema and sample data
df = spark.createDataFrame(data, schema=schema, verifySchema=True)

In [10]:
# Print schema
df.printSchema()

root
 |-- id: integer (nullable = false)
 |-- prix_eur: float (nullable = false)
 |-- technologie: string (nullable = false)
 |-- score: float (nullable = false)
 |-- statut: string (nullable = false)



In [11]:
# Preview the DataFrame
df.show(truncate=False)

+---+--------+-----------+-----+--------+
|id |prix_eur|technologie|score|statut  |
+---+--------+-----------+-----+--------+
|101|1499.99 |Laptop     |8.7  |en vente|
|102|199.0   |Drone      |9.1  |epuise  |
|103|749.5   |Laptop     |7.9  |en vente|
|104|299.95  |Drone      |8.3  |epuise  |
|105|129.99  |Smartwatch |7.5  |en vente|
|106|999.0   |Laptop     |9.4  |epuise  |
+---+--------+-----------+-----+--------+



In [12]:
# Index categorical columns: 'technologie' and 'statut'
tech_indexer = StringIndexer(inputCol="technologie", outputCol="technologie_idx", stringOrderType="frequencyAsc")
statut_indexer = StringIndexer(inputCol="statut", outputCol="statut_idx", stringOrderType="frequencyAsc")

# Create a pipeline for indexers
pipeline = Pipeline(stages=[tech_indexer, statut_indexer])
indexed_df = pipeline.fit(df).transform(df)

# Show indexed columns
indexed_df.select("id", "prix_eur", "technologie_idx", "score", "statut_idx").show(truncate=False)

                                                                                

+---+--------+---------------+-----+----------+
|id |prix_eur|technologie_idx|score|statut_idx|
+---+--------+---------------+-----+----------+
|101|1499.99 |2.0            |8.7  |0.0       |
|102|199.0   |1.0            |9.1  |1.0       |
|103|749.5   |2.0            |7.9  |0.0       |
|104|299.95  |1.0            |8.3  |1.0       |
|105|129.99  |0.0            |7.5  |0.0       |
|106|999.0   |2.0            |9.4  |1.0       |
+---+--------+---------------+-----+----------+



In [13]:
# One-hot encode indexed categorical columns
ohe_encoder = OneHotEncoder(
    inputCols=["technologie_idx", "statut_idx"],
    outputCols=["technologie_vec", "statut_vec"]
)

# Extend pipeline to include encoding
pipeline = Pipeline(stages=[tech_indexer, statut_indexer, ohe_encoder])
encoded_df = pipeline.fit(df).transform(df)

# Show one-hot encoded columns
encoded_df.select(
    "id", "prix_eur", "technologie_idx", "technologie_vec",
    "score", "statut_idx", "statut_vec"
).show(truncate=False)

+---+--------+---------------+---------------+-----+----------+-------------+
|id |prix_eur|technologie_idx|technologie_vec|score|statut_idx|statut_vec   |
+---+--------+---------------+---------------+-----+----------+-------------+
|101|1499.99 |2.0            |(2,[],[])      |8.7  |0.0       |(1,[0],[1.0])|
|102|199.0   |1.0            |(2,[1],[1.0])  |9.1  |1.0       |(1,[],[])    |
|103|749.5   |2.0            |(2,[],[])      |7.9  |0.0       |(1,[0],[1.0])|
|104|299.95  |1.0            |(2,[1],[1.0])  |8.3  |1.0       |(1,[],[])    |
|105|129.99  |0.0            |(2,[0],[1.0])  |7.5  |0.0       |(1,[0],[1.0])|
|106|999.0   |2.0            |(2,[],[])      |9.4  |1.0       |(1,[],[])    |
+---+--------+---------------+---------------+-----+----------+-------------+



In [14]:
# Assemble selected columns into a feature vector
assembler = VectorAssembler(
    inputCols=["prix_eur", "technologie_idx", "score"],
    outputCol="features"
)

# Apply transformation
output = assembler.transform(indexed_df)

# Show resulting features
output.select(col("id"), col("features")).show()


+---+--------------------+
| id|            features|
+---+--------------------+
|101|[1499.98999023437...|
|102|[199.0,1.0,9.1000...|
|103|[749.5,2.0,7.9000...|
|104|[299.950012207031...|
|105|[129.990005493164...|
|106|[999.0,2.0,9.3999...|
+---+--------------------+



In [18]:
import os

csv_path = "file://" + os.path.abspath("heart.csv")
data = (
    spark.read
    .format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(csv_path)
)

In [19]:
# Show sample rows
data.show(5, truncate=False)

+---+---+---+------+----+---+-------+--------+----+-------+---+---+-----+------+
|age|sex|cp |trtbps|chol|fbs|restecg|thalachh|exng|oldpeak|slp|caa|thall|output|
+---+---+---+------+----+---+-------+--------+----+-------+---+---+-----+------+
|63 |1  |3  |145   |233 |1  |0      |150     |0   |2.3    |0  |0  |1    |1     |
|37 |1  |2  |130   |250 |0  |1      |187     |0   |3.5    |0  |0  |2    |1     |
|41 |0  |1  |130   |204 |0  |0      |172     |0   |1.4    |2  |0  |2    |1     |
|56 |1  |1  |120   |236 |0  |1      |178     |0   |0.8    |2  |0  |2    |1     |
|57 |0  |0  |120   |354 |0  |1      |163     |1   |0.6    |2  |0  |2    |1     |
+---+---+---+------+----+---+-------+--------+----+-------+---+---+-----+------+
only showing top 5 rows



In [20]:
# Create a temporary SQL view
data.createOrReplaceTempView("data")

# Count rows and columns
nb_lines = data.count()
nb_columns = len(data.columns)

# Print schema
data.printSchema()

# Count unique values per column
unique_counts = [(col, data.select(col).distinct().count()) for col in data.columns]
unique_counts_df = spark.createDataFrame([Row(column=col, unique_values=count) for col, count in unique_counts])
unique_counts_df.show(truncate=False)


root
 |-- age: integer (nullable = true)
 |-- sex: integer (nullable = true)
 |-- cp: integer (nullable = true)
 |-- trtbps: integer (nullable = true)
 |-- chol: integer (nullable = true)
 |-- fbs: integer (nullable = true)
 |-- restecg: integer (nullable = true)
 |-- thalachh: integer (nullable = true)
 |-- exng: integer (nullable = true)
 |-- oldpeak: double (nullable = true)
 |-- slp: integer (nullable = true)
 |-- caa: integer (nullable = true)
 |-- thall: integer (nullable = true)
 |-- output: integer (nullable = true)

+--------+-------------+
|column  |unique_values|
+--------+-------------+
|age     |41           |
|sex     |2            |
|cp      |4            |
|trtbps  |49           |
|chol    |152          |
|fbs     |2            |
|restecg |3            |
|thalachh|91           |
|exng    |2            |
|oldpeak |40           |
|slp     |3            |
|caa     |5            |
|thall   |4            |
|output  |2            |
+--------+-------------+



In [21]:
# Separate columns into categorical and constant (numerical)
categorical_cols = ["sex", "exng", "caa", "cp", "fbs", "restecg", "slp", "thall"]
constant_cols = ["age", "trtbps", "chol", "thalachh", "oldpeak"]
target_col = ["output"]

# Summary statistics for numerical columns
data.select(constant_cols).describe().show()

25/05/19 02:03:11 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+------------------+------------------+------------------+------------------+------------------+
|summary|               age|            trtbps|              chol|          thalachh|           oldpeak|
+-------+------------------+------------------+------------------+------------------+------------------+
|  count|               303|               303|               303|               303|               303|
|   mean|54.366336633663366|131.62376237623764|246.26402640264027|149.64686468646866|1.0396039603960396|
| stddev|  9.08210098983786|  17.5381428135171| 51.83075098793005| 22.90516111491409|1.1610750220686346|
|    min|                29|                94|               126|                71|               0.0|
|    max|                77|               200|               564|               202|               6.2|
+-------+------------------+------------------+------------------+------------------+------------------+



In [22]:
# Count nulls in each column
null_counts_df = data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns])
null_counts_df.show()

# Count total and distinct rows
total_rows = data.count()
distinct_rows = data.distinct().count()
duplicated_rows = total_rows - distinct_rows

# Show distribution of target variable
data.groupBy("output").count().show()

+---+---+---+------+----+---+-------+--------+----+-------+---+---+-----+------+
|age|sex| cp|trtbps|chol|fbs|restecg|thalachh|exng|oldpeak|slp|caa|thall|output|
+---+---+---+------+----+---+-------+--------+----+-------+---+---+-----+------+
|  0|  0|  0|     0|   0|  0|      0|       0|   0|      0|  0|  0|    0|     0|
+---+---+---+------+----+---+-------+--------+----+-------+---+---+-----+------+

+------+-----+
|output|count|
+------+-----+
|     1|  165|
|     0|  138|
+------+-----+



In [23]:
# Assemble all features into one vector for correlation analysis
assembler = VectorAssembler(inputCols=data.columns, outputCol="features")
vector_df = assembler.transform(data).select("features")

# Compute correlation matrix
correlation_matrix = Correlation.corr(vector_df, "features").head()[0]
corr_array = correlation_matrix.toArray()
corr_array

25/05/19 02:03:24 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

array([[ 1.        , -0.0984466 , -0.06865302,  0.27935091,  0.21367796,
         0.12130765, -0.1162109 , -0.39852194,  0.09680083,  0.21001257,
        -0.16881424,  0.27632624,  0.06800138, -0.22543872],
       [-0.0984466 ,  1.        , -0.04935288, -0.05676882, -0.19791217,
         0.04503179, -0.05819627, -0.04401991,  0.14166381,  0.09609288,
        -0.03071057,  0.11826141,  0.2100411 , -0.28093658],
       [-0.06865302, -0.04935288,  1.        ,  0.04760776, -0.07690439,
         0.09444403,  0.04442059,  0.29576212, -0.39428027, -0.14923016,
         0.11971659, -0.18105303, -0.16173557,  0.43379826],
       [ 0.27935091, -0.05676882,  0.04760776,  1.        ,  0.12317421,
         0.17753054, -0.11410279, -0.04669773,  0.06761612,  0.19321647,
        -0.12147458,  0.10138899,  0.06220989, -0.14493113],
       [ 0.21367796, -0.19791217, -0.07690439,  0.12317421,  1.        ,
         0.0132936 , -0.15104008, -0.00993984,  0.06702278,  0.05395192,
        -0.00403777,  0.07

In [24]:
# Drop duplicate rows if any
data.count()
data = data.dropDuplicates()
data.count()

302

In [25]:
# Assemble selected features and scale them
assembler = VectorAssembler(
    inputCols=categorical_cols + constant_cols,
    outputCol="features"
)

scaler = StandardScaler(
    inputCol="features",
    outputCol="scaled_features",
    withStd=True,
    withMean=True
)

# Pipeline for assembling and scaling
pipeline = Pipeline(stages=[assembler, scaler])
scaled_data = pipeline.fit(data).transform(data)

# Show scaled features
scaled_data.show(5)

+---+---+---+------+----+---+-------+--------+----+-------+---+---+-----+------+--------------------+--------------------+
|age|sex| cp|trtbps|chol|fbs|restecg|thalachh|exng|oldpeak|slp|caa|thall|output|            features|     scaled_features|
+---+---+---+------+----+---+-------+--------+----+-------+---+---+-----+------+--------------------+--------------------+
| 69|  0|  3|   140| 239|  0|      1|     151|   0|    1.8|  2|  2|    2|     1|[0.0,0.0,2.0,3.0,...|[-1.4624390328984...|
| 53|  0|  0|   130| 264|  0|      0|     143|   0|    0.4|  1|  0|    2|     1|(13,[6,7,8,9,10,1...|[-1.4624390328984...|
| 54|  1|  2|   125| 273|  0|      0|     152|   0|    0.5|  0|  1|    2|     1|[1.0,0.0,1.0,2.0,...|[0.68152498620508...|
| 51|  1|  0|   140| 298|  0|      1|     122|   1|    4.2|  1|  3|    3|     0|[1.0,1.0,3.0,0.0,...|[0.68152498620508...|
| 58|  0|  1|   136| 319|  1|      0|     152|   0|    0.0|  2|  2|    2|     0|[0.0,0.0,2.0,1.0,...|[-1.4624390328984...|
+---+---+---+---

In [26]:
# Split dataset into training and testing sets
train_data, test_data = scaled_data.randomSplit([0.8, 0.2], seed=1234)