In [0]:
from pyspark.sql.types import StructField, StructType, DoubleType, IntegerType, StringType

# wczytywanie danych
schema = StructType([
    StructField("fixed acidity", DoubleType(), False),
    StructField("volatile acidity", DoubleType(), False),
    StructField("citric acid", DoubleType(), False),
    StructField("residual sugar", DoubleType(), False),
    StructField("chlorides", DoubleType(), False),
    StructField("free sulfur dioxide", IntegerType(), False),
    StructField("total sulfur dioxide", IntegerType(), False),
    StructField("density", DoubleType(), False),
    StructField("pH", DoubleType(), False),
    StructField("suplhates", DoubleType(), False),
    StructField("alcohol", DoubleType(), False),
    StructField("quality", IntegerType(), False),
])

df = spark.read.format("csv").options(sep=";", header=True).schema(schema).load("dbfs:/databricks-datasets/wine-quality/winequality-red.csv")

display(df)

fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,suplhates,alcohol,quality
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5
7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5
7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5
11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5
7.4,0.66,0.0,1.8,0.075,13.0,40.0,0.9978,3.51,0.56,9.4,5
7.9,0.6,0.06,1.6,0.069,15.0,59.0,0.9964,3.3,0.46,9.4,5
7.3,0.65,0.0,1.2,0.065,15.0,21.0,0.9946,3.39,0.47,10.0,7
7.8,0.58,0.02,2.0,0.073,9.0,18.0,0.9968,3.36,0.57,9.5,7
7.5,0.5,0.36,6.1,0.071,17.0,102.0,0.9978,3.35,0.8,10.5,5


In [0]:
# psucie danych
from pyspark.sql.functions import * 
df = df.withColumn("pH", when(col("pH") > 3.5, None).otherwise(col("pH")))
display(df)

fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,suplhates,alcohol,quality
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,,0.56,9.4,5
7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5
7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5
11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,,0.56,9.4,5
7.4,0.66,0.0,1.8,0.075,13.0,40.0,0.9978,,0.56,9.4,5
7.9,0.6,0.06,1.6,0.069,15.0,59.0,0.9964,3.3,0.46,9.4,5
7.3,0.65,0.0,1.2,0.065,15.0,21.0,0.9946,3.39,0.47,10.0,7
7.8,0.58,0.02,2.0,0.073,9.0,18.0,0.9968,3.36,0.57,9.5,7
7.5,0.5,0.36,6.1,0.071,17.0,102.0,0.9978,3.35,0.8,10.5,5


In [0]:
display(df.na.fill(3.5))

fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,suplhates,alcohol,quality
7.4,0.7,0.0,1.9,0.076,11,34,0.9978,3.5,0.56,9.4,5
7.8,0.88,0.0,2.6,0.098,25,67,0.9968,3.2,0.68,9.8,5
7.8,0.76,0.04,2.3,0.092,15,54,0.997,3.26,0.65,9.8,5
11.2,0.28,0.56,1.9,0.075,17,60,0.998,3.16,0.58,9.8,6
7.4,0.7,0.0,1.9,0.076,11,34,0.9978,3.5,0.56,9.4,5
7.4,0.66,0.0,1.8,0.075,13,40,0.9978,3.5,0.56,9.4,5
7.9,0.6,0.06,1.6,0.069,15,59,0.9964,3.3,0.46,9.4,5
7.3,0.65,0.0,1.2,0.065,15,21,0.9946,3.39,0.47,10.0,7
7.8,0.58,0.02,2.0,0.073,9,18,0.9968,3.36,0.57,9.5,7
7.5,0.5,0.36,6.1,0.071,17,102,0.9978,3.35,0.8,10.5,5


In [0]:
exploded_df = df.withColumn("split", split(col("fixed acidity").cast(StringType()), "\\."))
exploded_df = exploded_df.withColumn("exploded", explode(col("split")))

display(exploded_df)

fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,suplhates,alcohol,quality,split,exploded
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5,"List(7, 4)",7
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5,"List(7, 4)",4
7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5,"List(7, 8)",7
7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5,"List(7, 8)",8
7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5,"List(7, 8)",7
7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5,"List(7, 8)",8
11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6,"List(11, 2)",11
11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6,"List(11, 2)",2
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5,"List(7, 4)",7
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5,"List(7, 4)",4


In [0]:
# drop col 
display(df.drop(col("pH")))

fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,suplhates,alcohol,quality
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,0.56,9.4,5
7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,0.68,9.8,5
7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,0.65,9.8,5
11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,0.58,9.8,6
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,0.56,9.4,5
7.4,0.66,0.0,1.8,0.075,13.0,40.0,0.9978,0.56,9.4,5
7.9,0.6,0.06,1.6,0.069,15.0,59.0,0.9964,0.46,9.4,5
7.3,0.65,0.0,1.2,0.065,15.0,21.0,0.9946,0.47,10.0,7
7.8,0.58,0.02,2.0,0.073,9.0,18.0,0.9968,0.57,9.5,7
7.5,0.5,0.36,6.1,0.071,17.0,102.0,0.9978,0.8,10.5,5


In [0]:
display(df.na.drop())

fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,suplhates,alcohol,quality
7.4,0.7,0.0,1.9,0.076,11,34,0.9978,3.51,0.56,9.4,5
7.8,0.88,0.0,2.6,0.098,25,67,0.9968,3.2,0.68,9.8,5
7.8,0.76,0.04,2.3,0.092,15,54,0.997,3.26,0.65,9.8,5
11.2,0.28,0.56,1.9,0.075,17,60,0.998,3.16,0.58,9.8,6
7.4,0.7,0.0,1.9,0.076,11,34,0.9978,3.51,0.56,9.4,5
7.4,0.66,0.0,1.8,0.075,13,40,0.9978,3.51,0.56,9.4,5
7.9,0.6,0.06,1.6,0.069,15,59,0.9964,3.3,0.46,9.4,5
7.3,0.65,0.0,1.2,0.065,15,21,0.9946,3.39,0.47,10.0,7
7.8,0.58,0.02,2.0,0.073,9,18,0.9968,3.36,0.57,9.5,7
7.5,0.5,0.36,6.1,0.071,17,102,0.9978,3.35,0.8,10.5,5


In [0]:
regexp_df = df.withColumn("dot2comma", regexp_replace("fixed acidity", "\\.", ","))
display(regexp_df)

fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,suplhates,alcohol,quality,dot2comma
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5,74
7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5,78
7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5,78
11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6,112
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5,74
7.4,0.66,0.0,1.8,0.075,13.0,40.0,0.9978,3.51,0.56,9.4,5,74
7.9,0.6,0.06,1.6,0.069,15.0,59.0,0.9964,3.3,0.46,9.4,5,79
7.3,0.65,0.0,1.2,0.065,15.0,21.0,0.9946,3.39,0.47,10.0,7,73
7.8,0.58,0.02,2.0,0.073,9.0,18.0,0.9968,3.36,0.57,9.5,7,78
7.5,0.5,0.36,6.1,0.071,17.0,102.0,0.9978,3.35,0.8,10.5,5,75


In [0]:
regexp_extracted_df = df.withColumn("extracted_floating_points", regexp_extract("fixed acidity", "(\d+)\\.(\d+)", 1))

display(regexp_extracted_df)

fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,suplhates,alcohol,quality,extracted_floating_points
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5,7
7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5,7
7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5,7
11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6,11
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5,7
7.4,0.66,0.0,1.8,0.075,13.0,40.0,0.9978,3.51,0.56,9.4,5,7
7.9,0.6,0.06,1.6,0.069,15.0,59.0,0.9964,3.3,0.46,9.4,5,7
7.3,0.65,0.0,1.2,0.065,15.0,21.0,0.9946,3.39,0.47,10.0,7,7
7.8,0.58,0.02,2.0,0.073,9.0,18.0,0.9968,3.36,0.57,9.5,7,7
7.5,0.5,0.36,6.1,0.071,17.0,102.0,0.9978,3.35,0.8,10.5,5,7


In [0]:
display(df.withColumn("ifnull", ifnull(df.pH, lit(9))))

fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,suplhates,alcohol,quality,ifnull
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,,0.56,9.4,5,9.0
7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5,3.2
7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5,3.26
11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6,3.16
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,,0.56,9.4,5,9.0
7.4,0.66,0.0,1.8,0.075,13.0,40.0,0.9978,,0.56,9.4,5,9.0
7.9,0.6,0.06,1.6,0.069,15.0,59.0,0.9964,3.3,0.46,9.4,5,3.3
7.3,0.65,0.0,1.2,0.065,15.0,21.0,0.9946,3.39,0.47,10.0,7,3.39
7.8,0.58,0.02,2.0,0.073,9.0,18.0,0.9968,3.36,0.57,9.5,7,3.36
7.5,0.5,0.36,6.1,0.071,17.0,102.0,0.9978,3.35,0.8,10.5,5,3.35


In [0]:
display(df.withColumn("nullif", nullif(col("total sulfur dioxide"), col("free sulfur dioxide"))))

fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,suplhates,alcohol,quality,nullif
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,,0.56,9.4,5,34.0
7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5,67.0
7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5,54.0
11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6,60.0
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,,0.56,9.4,5,34.0
7.4,0.66,0.0,1.8,0.075,13.0,40.0,0.9978,,0.56,9.4,5,40.0
7.9,0.6,0.06,1.6,0.069,15.0,59.0,0.9964,3.3,0.46,9.4,5,59.0
7.3,0.65,0.0,1.2,0.065,15.0,21.0,0.9946,3.39,0.47,10.0,7,21.0
7.8,0.58,0.02,2.0,0.073,9.0,18.0,0.9968,3.36,0.57,9.5,7,18.0
7.5,0.5,0.36,6.1,0.071,17.0,102.0,0.9978,3.35,0.8,10.5,5,102.0


In [0]:
display(regexp_extracted_df.withColumn("replace", replace(col("extracted_floating_points"), lit(7), lit("hehe"))))

fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,suplhates,alcohol,quality,extracted_floating_points,replace
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5,7,hehe
7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5,7,hehe
7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5,7,hehe
11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6,11,11
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5,7,hehe
7.4,0.66,0.0,1.8,0.075,13.0,40.0,0.9978,3.51,0.56,9.4,5,7,hehe
7.9,0.6,0.06,1.6,0.069,15.0,59.0,0.9964,3.3,0.46,9.4,5,7,hehe
7.3,0.65,0.0,1.2,0.065,15.0,21.0,0.9946,3.39,0.47,10.0,7,7,hehe
7.8,0.58,0.02,2.0,0.073,9.0,18.0,0.9968,3.36,0.57,9.5,7,7,hehe
7.5,0.5,0.36,6.1,0.071,17.0,102.0,0.9978,3.35,0.8,10.5,5,7,hehe


In [0]:
display(df.withColumn("array_", array([col("pH"), col("density")])).withColumn("array_contains", array_contains(col("array_"), lit(5))))

fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,suplhates,alcohol,quality,array_,array_contains
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,,0.56,9.4,5,"List(null, 0.9978)",
7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5,"List(3.2, 0.9968)",False
7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5,"List(3.26, 0.997)",False
11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6,"List(3.16, 0.998)",False
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,,0.56,9.4,5,"List(null, 0.9978)",
7.4,0.66,0.0,1.8,0.075,13.0,40.0,0.9978,,0.56,9.4,5,"List(null, 0.9978)",
7.9,0.6,0.06,1.6,0.069,15.0,59.0,0.9964,3.3,0.46,9.4,5,"List(3.3, 0.9964)",False
7.3,0.65,0.0,1.2,0.065,15.0,21.0,0.9946,3.39,0.47,10.0,7,"List(3.39, 0.9946)",False
7.8,0.58,0.02,2.0,0.073,9.0,18.0,0.9968,3.36,0.57,9.5,7,"List(3.36, 0.9968)",False
7.5,0.5,0.36,6.1,0.071,17.0,102.0,0.9978,3.35,0.8,10.5,5,"List(3.35, 0.9978)",False


In [0]:
display(df.select(avg("pH")))

avg(pH)
3.2783717679944093


In [0]:
display(df.select(collect_set("pH")))

collect_set(pH)
"List(2.89, 2.86, 3.31, 3.16, 3.33, 3.41, 2.99, 3.47, 3.48, 3.08, 2.74, 3.12, 3.1, 3.06, 2.9, 2.87, 2.95, 3.37, 2.93, 3.36, 3.11, 3.34, 3.15, 3.09, 3.35, 3.4, 3.42, 3.03, 3.28, 2.92, 3.26, 3.17, 2.94, 3.25, 3.24, 3.0, 3.05, 3.3, 3.02, 3.14, 3.27, 3.39, 3.01, 3.38, 3.21, 3.44, 3.19, 3.5, 3.29, 2.88, 3.49, 3.23, 3.22, 3.13, 3.46, 2.98, 3.04, 3.32, 3.18, 3.07, 3.45, 3.43, 3.2)"


In [0]:
display(df.select(sum_distinct(col("fixed acidity"))))

sum(DISTINCT fixed acidity)
914.4


In [0]:
def dot2comma(value):
    return str(value).replace(".", ",")

dot2comma_udf = udf(dot2comma, StringType())

In [0]:
display(df.withColumn("ph_comma", dot2comma_udf(col("pH"))))

fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,suplhates,alcohol,quality,ph_comma
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,,0.56,9.4,5,
7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5,32.0
7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5,326.0
11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6,316.0
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,,0.56,9.4,5,
7.4,0.66,0.0,1.8,0.075,13.0,40.0,0.9978,,0.56,9.4,5,
7.9,0.6,0.06,1.6,0.069,15.0,59.0,0.9964,3.3,0.46,9.4,5,33.0
7.3,0.65,0.0,1.2,0.065,15.0,21.0,0.9946,3.39,0.47,10.0,7,339.0
7.8,0.58,0.02,2.0,0.073,9.0,18.0,0.9968,3.36,0.57,9.5,7,336.0
7.5,0.5,0.36,6.1,0.071,17.0,102.0,0.9978,3.35,0.8,10.5,5,335.0


In [0]:
import pandas as pd

@pandas_udf(DoubleType())
def calc_sqrt(series: pd.Series) -> pd.Series:
    return pd.Series(np.sqrt(series))

In [0]:
display(df.withColumn("sqrt_ph", calc_sqrt(col("pH"))))

fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,suplhates,alcohol,quality,sqrt_ph
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,,0.56,9.4,5,
7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5,1.7888543819998317
7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5,1.8055470085267789
11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6,1.7776388834631178
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,,0.56,9.4,5,
7.4,0.66,0.0,1.8,0.075,13.0,40.0,0.9978,,0.56,9.4,5,
7.9,0.6,0.06,1.6,0.069,15.0,59.0,0.9964,3.3,0.46,9.4,5,1.816590212458495
7.3,0.65,0.0,1.2,0.065,15.0,21.0,0.9946,3.39,0.47,10.0,7,1.841195263952197
7.8,0.58,0.02,2.0,0.073,9.0,18.0,0.9968,3.36,0.57,9.5,7,1.833030277982336
7.5,0.5,0.36,6.1,0.071,17.0,102.0,0.9978,3.35,0.8,10.5,5,1.8303005217723127
