In [None]:

# Required libraries
from pyspark.sql import SparkSession

# Create Spark Session
spark = SparkSession.builder.appName("example").getOrCreate()


In [None]:

# Reading CSV file
df_csv = spark.read.csv("/path/to/csv/file", header=True, inferSchema=True)

# Reading Parquet file
df_parquet = spark.read.parquet("/path/to/parquet/file")

# Reading from a database
df_db = spark.read.format("jdbc").option("url", "jdbc:postgresql://host:port/dbname")\
                   .option("dbtable", "table_name")\
                   .option("user", "username")\
                   .option("password", "password").load()


In [None]:

# Sample DataFrame
data = [("Alice", None), ("Bob", 45), ("Cathy", 29)]
df = spark.createDataFrame(data, ["Name", "Age"])

# Dropping rows with missing values
df_clean = df.dropna()

# Filling missing values
df_filled = df.fillna({'Age': 0})

# Type casting
df_casted = df.withColumn("Age", df["Age"].cast("Integer"))

# Basic transformations
df_filtered = df.filter(df['Age'] > 10)


In [None]:

# Group by and aggregation
df_aggregated = df.groupBy("Name").agg({"Age": "sum"})

# Joins
df1 = spark.createDataFrame([("Alice", 34), ("Bob", 45)], ["Name", "Age"])
df2 = spark.createDataFrame([("Alice", "F"), ("Bob", "M")], ["Name", "Gender"])
df_joined = df1.join(df2, df1["Name"] == df2["Name"], "inner")

# Pivoting
df_pivot = df.groupBy("Name").pivot("Age").sum("Age")


In [None]:

from pyspark.sql.functions import lit

# Adding a new column
df_enriched = df.withColumn("Country", lit("USA"))

# Using UDF for custom transformations
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def custom_function(value):
    return value.upper() if value else None

udf_custom_function = udf(custom_function, StringType())
df_enriched = df.withColumn("Name_Upper", udf_custom_function(df["Name"]))


In [None]:

# SQL queries
df.createOrReplaceTempView("people")
df_sql = spark.sql("SELECT Name, COUNT(*) FROM people GROUP BY Name")

# Aggregations using DataFrame API
df_aggregated = df.groupBy("Name").count()


In [None]:

# Writing to CSV
df.write.csv("/path/to/output/csv")

# Writing to Parquet
df.write.parquet("/path/to/output/parquet")

# Writing to a database
df.write.format("jdbc").option("url", "jdbc:postgresql://host:port/dbname")\
                       .option("dbtable", "table_name")\
                       .option("user", "username")\
                       .option("password", "password").save()


In [None]:

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

# Define a window specification
windowSpec = Window.partitionBy("Name").orderBy("Age")
df_with_row_number = df.withColumn("row_number", row_number().over(windowSpec))

# Define a Python function
def to_upper(s):
    return s.upper() if s else None

# Register the function as a UDF
to_upper_udf = udf(to_upper, StringType())

# Use the UDF in DataFrame operations
df_with_upper = df.withColumn("Name_Upper", to_upper_udf(col("Name")))


In [None]:

from pyspark.sql.functions import broadcast

# Perform a broadcast join
df_joined = df1.join(broadcast(df2), df1["Name"] == df2["Name"])

# Cache a DataFrame
df.cache()

# Persist a DataFrame with a specific storage level
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK)

# Perform actions to materialize the cache
df.count()

# Repartition a DataFrame
df_repartitioned = df.repartition(10)

# Coalesce a DataFrame
df_coalesced = df.coalesce(2)


In [None]:

from pyspark.sql.functions import explode, split, struct, create_map, lit

# Create a DataFrame with an array column
df_with_array = df.select(split(col("Name"), ",").alias("Name_Array"))

# Explode the array into individual rows
df_exploded = df_with_array.select(explode(col("Name_Array")).alias("Name"))

# Create a DataFrame with a struct column
df_with_struct = df.select(struct(col("Name"), col("Age")).alias("Person"))

# Select fields from the struct
df_selected = df_with_struct.select(col("Person.Name"), col("Person.Age"))

# Create a DataFrame with a map column
df_with_map = df.select(create_map(lit("Name"), col("Name"), lit("Age"), col("Age")).alias("Name_Age_Map"))

# Access elements from the map
df_selected = df_with_map.select(col("Name_Age_Map")["Name"].alias("Name"), col("Name_Age_Map")["Age"].alias("Age"))


In [None]:

from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Sample DataFrame
data = [("Alice", 1), ("Bob", 0), ("Cathy", 1)]
df = spark.createDataFrame(data, ["Name", "Label"])

# StringIndexer for categorical features
indexer = StringIndexer(inputCol="Name", outputCol="NameIndex")
df_indexed = indexer.fit(df).transform(df)

# OneHotEncoder for indexed categorical features
encoder = OneHotEncoder(inputCol="NameIndex", outputCol="NameVec")
df_encoded = encoder.fit(df_indexed).transform(df_indexed)

# Define the stages of the pipeline
stages = [indexer, encoder, LogisticRegression(featuresCol="NameVec", labelCol="Label")]

# Create a pipeline
pipeline = Pipeline(stages=stages)

# Fit the pipeline model
model = pipeline.fit(df_encoded)

# Make predictions
predictions = model.transform(df_encoded)

# Initialize evaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="Label")

# Evaluate model
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")
