In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [2]:
conf = SparkConf()
spark_context = SparkSession.builder.config(conf=conf).getOrCreate()

In [3]:
#spark = SparkSession.builder.master("spark://spark-master:7077").config("spark.jars.packages", 
                                                                        #"org.apache.hadoop:hadoop-aws-2.7.3").appName("spark-example").getOrCreate()
spark = SparkSession.builder.master("local[*]").getOrCreate()

## Load data

### Load CSV file into DataFrame


In [4]:
csv_file = "../data/oscars.csv"
df_csv = spark.read.csv(csv_file, header=True, inferSchema=True)

df_csv.show()

+--------+--------+------------+-------------------+--------------------+---+-------------+----------+--------+-------+-------+
|oscar_no|oscar_yr|       award|               name|               movie|age|     birth_pl|birth_date|birth_mo|birth_d|birth_y|
+--------+--------+------------+-------------------+--------------------+---+-------------+----------+--------+-------+-------+
|       1|    1929|Best actress|       Janet Gaynor|          7th Heaven| 22| Pennsylvania|1906-10-06|      10|      6|   1906|
|       2|    1930|Best actress|      Mary Pickford|            Coquette| 37|       Canada|1892-04-08|       4|      8|   1892|
|       3|    1931|Best actress|      Norma Shearer|        The Divorcee| 28|       Canada|1902-08-10|       8|     10|   1902|
|       4|    1932|Best actress|     Marie Dressler|        Min and Bill| 63|       Canada|1868-11-09|      11|      9|   1868|
|       5|    1933|Best actress|        Helen Hayes|The Sin of Madelo...| 32|Washington DC|1900-10-10|  

In [5]:
df_csv.explain(True)

== Parsed Logical Plan ==
Relation [oscar_no#17,oscar_yr#18,award#19,name#20,movie#21,age#22,birth_pl#23,birth_date#24,birth_mo#25,birth_d#26,birth_y#27] csv

== Analyzed Logical Plan ==
oscar_no: int, oscar_yr: int, award: string, name: string, movie: string, age: int, birth_pl: string, birth_date: date, birth_mo: int, birth_d: int, birth_y: int
Relation [oscar_no#17,oscar_yr#18,award#19,name#20,movie#21,age#22,birth_pl#23,birth_date#24,birth_mo#25,birth_d#26,birth_y#27] csv

== Optimized Logical Plan ==
Relation [oscar_no#17,oscar_yr#18,award#19,name#20,movie#21,age#22,birth_pl#23,birth_date#24,birth_mo#25,birth_d#26,birth_y#27] csv

== Physical Plan ==
FileScan csv [oscar_no#17,oscar_yr#18,award#19,name#20,movie#21,age#22,birth_pl#23,birth_date#24,birth_mo#25,birth_d#26,birth_y#27] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/ihor/dev/repos/r_d-de-course/lec14/data/oscars.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: 

In [6]:
df_csv.printSchema()

root
 |-- oscar_no: integer (nullable = true)
 |-- oscar_yr: integer (nullable = true)
 |-- award: string (nullable = true)
 |-- name: string (nullable = true)
 |-- movie: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- birth_pl: string (nullable = true)
 |-- birth_date: date (nullable = true)
 |-- birth_mo: integer (nullable = true)
 |-- birth_d: integer (nullable = true)
 |-- birth_y: integer (nullable = true)



In [7]:
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("year", StringType(), True),
    StructField("age", IntegerType(), True) #Wrong type
])

df_csv_with_schema = spark.read.schema(schema).csv(csv_file, header=True)

df_csv_with_schema.show()

+---+----+----+
| id|year| age|
+---+----+----+
|  1|1929|NULL|
|  2|1930|NULL|
|  3|1931|NULL|
|  4|1932|NULL|
|  5|1933|NULL|
|  6|1934|NULL|
|  7|1935|NULL|
|  8|1936|NULL|
|  9|1937|NULL|
| 10|1938|NULL|
| 11|1939|NULL|
| 12|1940|NULL|
| 13|1941|NULL|
| 14|1942|NULL|
| 15|1943|NULL|
| 16|1944|NULL|
| 17|1945|NULL|
| 18|1946|NULL|
| 19|1947|NULL|
| 20|1948|NULL|
+---+----+----+
only showing top 20 rows



In [8]:
df_csv_with_schema.printSchema()

root
 |-- id: integer (nullable = true)
 |-- year: string (nullable = true)
 |-- age: integer (nullable = true)



In [9]:
# Load CSV file with specific options
df_csv_options = spark.read.options(header='True', 
                                    inferSchema='True', 
                                    delimiter=',',
                                    quote='"',
                                    dateFormat='yyyy-MM-dd',
                                    escape='\\').csv(csv_file)

df_csv_options.show()

+--------+--------+------------+-------------------+--------------------+---+-------------+----------+--------+-------+-------+
|oscar_no|oscar_yr|       award|               name|               movie|age|     birth_pl|birth_date|birth_mo|birth_d|birth_y|
+--------+--------+------------+-------------------+--------------------+---+-------------+----------+--------+-------+-------+
|       1|    1929|Best actress|       Janet Gaynor|          7th Heaven| 22| Pennsylvania|1906-10-06|      10|      6|   1906|
|       2|    1930|Best actress|      Mary Pickford|            Coquette| 37|       Canada|1892-04-08|       4|      8|   1892|
|       3|    1931|Best actress|      Norma Shearer|        The Divorcee| 28|       Canada|1902-08-10|       8|     10|   1902|
|       4|    1932|Best actress|     Marie Dressler|        Min and Bill| 63|       Canada|1868-11-09|      11|      9|   1868|
|       5|    1933|Best actress|        Helen Hayes|The Sin of Madelo...| 32|Washington DC|1900-10-10|  

### Load JSON file into DataFrame

In [10]:
json_file = "../data/sales.json"
df_json = spark.read.json(json_file, multiLine=True)

df_json.show()

+-----------------+-----+--------------+-------------+
|           client|price|       product|purchase_date|
+-----------------+-----+--------------+-------------+
|     Norma Fisher|  121|Vacuum cleaner|   2022-08-09|
|     Norma Fisher|  348|Microwave oven|   2022-08-13|
|     Norma Fisher| 1126|         Phone|   2022-08-12|
|   Jorge Sullivan|  171|Microwave oven|   2022-08-10|
|  Elizabeth Woods| 1766|            TV|   2022-08-26|
|     Susan Wagner|  461|Microwave oven|   2022-08-26|
|     Susan Wagner|  561|Microwave oven|   2022-08-05|
| Peter Montgomery| 1994|            TV|   2022-08-03|
| Peter Montgomery| 2804|coffee machine|   2022-08-16|
|Stephanie Collins|  403|Vacuum cleaner|   2022-08-20|
|Stephanie Collins| 1775|coffee machine|   2022-08-18|
| Stephanie Sutton|  613|Vacuum cleaner|   2022-08-09|
| Stephanie Sutton| 2148|            TV|   2022-08-30|
| Stephanie Sutton|  568|Microwave oven|   2022-08-01|
|       Susan Levy|  109|coffee machine|   2022-08-20|
|       Su

In [21]:
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("details", StructType([
        StructField("age", IntegerType(), True),
        StructField("address", StructType([
            StructField("city", StringType(), True),
            StructField("state", StringType(), True)
        ]), True)
    ]), True)
])

nested_json_file = "../data/few_files/2.json"
df_nested_json = spark.read.schema(schema).json(nested_json_file, multiLine=True)

# df_nested_json.show(truncate=False)

In [22]:
from pathlib import Path
from functools import reduce
from pyspark.sql import DataFrame

files = sorted(Path("../data/few_files/").glob("*.json"))
dfs = [spark.read.schema(schema).json(str(f), multiLine=True) for f in files]
df_all = reduce(DataFrame.unionByName, dfs)

df_all.show(truncate=False)

+---+---------------+-----------------------+
|id |name           |details                |
+---+---------------+-----------------------+
|1  |John Doe       |{30, {New York, NY}}   |
|2  |Jane Smith     |{25, {Los Angeles, CA}}|
|3  |Michael Johnson|{40, {Chicago, IL}}    |
+---+---------------+-----------------------+



In [23]:
df_nested_json.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- details: struct (nullable = true)
 |    |-- age: integer (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)



### Load Parquet file into DataFrame

In [24]:
parquet_file = "../data/titanic.parquet"
df_parquet = spark.read.parquet(parquet_file)

# Show DataFrame
df_parquet.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
|          6|       0|     3|    Moran, Mr. James|  male|NULL|    0|    0|      

### Load Text file into DataFrame

In [25]:
text_file = "../data/android.txt"
df_text = spark.read.text(text_file)

# Show DataFrame
df_text.show()

+--------------------+
|               value|
+--------------------+
|03-17 16:13:38.81...|
|03-17 16:13:38.81...|
|03-17 16:13:38.82...|
|03-17 16:13:38.83...|
|03-17 16:13:38.85...|
|03-17 16:13:38.86...|
|03-17 16:13:38.86...|
|03-17 16:13:38.87...|
|03-17 16:13:38.87...|
|03-17 16:13:38.87...|
|03-17 16:13:38.88...|
|03-17 16:13:38.88...|
|03-17 16:13:38.88...|
|03-17 16:13:38.88...|
|03-17 16:13:38.90...|
|03-17 16:13:38.90...|
|03-17 16:13:38.91...|
|03-17 16:13:38.92...|
|03-17 16:13:38.92...|
|03-17 16:13:38.93...|
+--------------------+
only showing top 20 rows



In [26]:
sc = spark.sparkContext

# Load text file into an RDD
rdd = sc.textFile(text_file)

In [27]:
rdd.take(10) # Didn't work

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 26.0 failed 1 times, most recent failure: Lost task 0.0 in stage 26.0 (TID 33) (host.docker.internal executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:713)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:757)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:675)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:641)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:617)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:574)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:532)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 17 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:181)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:713)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:757)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:675)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:641)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:617)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:574)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:532)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 17 more


In [28]:
# Used DataFrame API instead of RDD
df_text = spark.read.text(text_file)
df_text.show(10, truncate=False) 

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                                                         |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|03-17 16:13:38.811  1702  2395 D Win

### Loading Data from Apache Kafka

In [None]:
# Initialize Spark session with Kafka package
spark = SparkSession.builder \
    .appName("Load Kafka") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1") \
    .getOrCreate()

# Load data from Kafka
kafka_df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "your_topic") \
    .load()

kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show()

### Loading Data from a JDBC Source with Partitioning

In [None]:
# Load data from a SQL database with partitioning
jdbc_url = "jdbc:postgresql://your-db-host:5432/your-database"
properties = {
    "user": "your-username",
    "password": "your-password",
    "driver": "org.postgresql.Driver"
}

df_sql_partitioned = spark.read.jdbc(
    url=jdbc_url,
    table="your_table_name",
    properties=properties,
    column="id",
    lowerBound=1,
    upperBound=100000,
    numPartitions=10
)

df_sql_partitioned.show()


### RDD

In [None]:
rdd.take(10)

In [None]:
line_count = rdd.count()
print(f"Number of lines: {line_count}")



In [None]:
first_line = rdd.first()
print(f"First line: {first_line}")

In [None]:
filtered_rdd = rdd.filter(lambda line: 'WindowManager:' in line)
filtered_rdd.take(10)

In [None]:
line_lengths = rdd.map(lambda line: len(line))
line_lengths.take(10)

In [None]:
# Reduce to get the total number of characters in the text file
total_characters = line_lengths.reduce(lambda a, b: a + b)
print(f"Total number of characters: {total_characters}")

In [None]:
rdd.flatMap(lambda line: line.split(" ")).take(10)

In [None]:
words = rdd.flatMap(lambda line: line.split(" "))
word_pairs = words.map(lambda word: (word, 1))
word_counts = word_pairs.reduceByKey(lambda a, b: a + b)

word_counts.take(10)

In [None]:
schema = StructType([
    StructField("word", StringType(), True),
    StructField("count", IntegerType(), True)
])

word_counts_df = spark.createDataFrame(word_counts, schema).show()


In [None]:
import re

In [None]:
# Define the regular expression pattern for real words
pattern = re.compile(r'\b[A-Za-z]+\b')

def extract_words(line):
    return pattern.findall(line)

words_rdd = rdd.flatMap(extract_words)

words_rdd.take(10)

In [None]:
word_pairs = words_rdd.map(lambda word: (word, 1))
word_counts = word_pairs.reduceByKey(lambda a, b: a + b)

word_counts.sortByKey().take(20)

In [None]:
word_counts.sortBy(lambda x: x[1], ascending=False).take(20)

In [None]:
word_counts.distinct().take(20)

In [None]:
word_counts.count()

In [None]:
word_counts.collect()

word_counts.getNumPartitions()

In [None]:
rdd1 = sc.parallelize([("a", 1), ("b", 2)])
rdd2 = sc.parallelize([("a", 3), ("b", 4), ("c", 5)])
result = rdd1.join(rdd2)
result.take(10)

In [None]:
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([3, 4, 5])
result = rdd1.union(rdd2)

result.take(10)

### DataFrame

In [None]:
df_parquet.select("name").show()
df_parquet.select("name", "age").show()

In [None]:
df_parquet.filter(df_parquet["age"] > 25).show()

In [None]:
df_parquet.filter("age > 25").show()

In [None]:
df_parquet.filter(F.col("age") > 25).show()

In [None]:
df_parquet.filter((F.col("age") > 25) & (F.col("age") < 30)).show()

In [None]:
df_grouped = df_parquet.groupBy("age").count()
df_grouped.show()

In [None]:
df_parquet.groupBy("age")

In [None]:
df_grouped = df_parquet.groupBy("age").avg("Fare")
df_grouped.show()

In [None]:
df_parquet.agg(F.avg("age")).show()

In [None]:
df_parquet.agg(F.min("age")).show()

In [None]:
df_parquet.agg(F.max("age")).show()

In [None]:
df_parquet.withColumn("age_in_10_years", F.col("age") + 10).show()

In [None]:
data = [("John", 30), ("Jane", 25), ("Doe", 22)]
df = spark.createDataFrame(data, ["name", "age"])

data2 = [("John", "USA"), ("Jane", "UK"), ("Doe", "Canada")]
df2 = spark.createDataFrame(data2, ["name", "country"])

df_joined = df.join(df2, on="name", how="inner")
df_joined.show()

In [None]:
df_parquet.orderBy("age").show()
df_parquet.orderBy(F.col("age").desc()).show()

In [None]:
df_parquet.drop("age").show()

In [None]:
df_parquet.show()

In [None]:
df_parquet.distinct().show()

In [None]:
df_parquet.dropDuplicates().show()

In [None]:
df_parquet.dropDuplicates(["age"]).show()

In [None]:
df_parquet.fillna({"age": 0}).show()

In [None]:
df_parquet.replace({3: 30, 1: 10}, subset='Pclass').show()

In [None]:
df_parquet.sample(fraction=0.5).show()

In [None]:
data1 = [("John", 30), ("Jane", 25)]
data2 = [("John", 30), ("Doe", 22)]
df1 = spark.createDataFrame(data1, ["name", "age"])
df2 = spark.createDataFrame(data2, ["name", "age"])

df1.intersect(df2).show()

In [None]:
data1 = [("John", 30), ("Jane", 25)]
data2 = [("Doe", 22), ("Smith", 35)]
df1 = spark.createDataFrame(data1, ["name", "age"])
df2 = spark.createDataFrame(data2, ["name", "age"])

df1.union(df2).show()

In [None]:
data1 = [("John", 30), ("Jane", 25)]
data2 = [("John", 30), ("Doe", 22)]
df1 = spark.createDataFrame(data1, ["name", "age"])
df2 = spark.createDataFrame(data2, ["name", "age"])

df1.exceptAll(df2).show()

In [None]:
df_parquet.describe().show()

In [None]:
df_parquet.select(df_parquet["name"].alias("full_name"), "age").show()

In [None]:
pandas_df = df_parquet.toPandas()
pandas_df

In [None]:
df_parquet.withColumnRenamed("name", "full_name").show()

In [None]:
df_parquet.sort(df_parquet["age"].desc()).show()

In [None]:
df_parquet.rdd.take(
    10
)

In [None]:
df_parquet.limit(2).show()

In [None]:
# df_parquet.head(2)

In [None]:
for part in df_parquet.randomSplit([0.3, 0.4, 0.3]):
    part.show()


In [None]:
df_parquet.write.mode("overwrite").parquet("./data/output/new_data.parquet")

In [None]:
df_parquet.filter(F.col("age").isNotNull()).show()

In [None]:
df_parquet.filter(F.col("age").isNull()).show()

In [None]:
df_parquet.dropna().show()

In [None]:
df_parquet.select(F.countDistinct("name")).show()


In [None]:
df_parquet.corr("age", "Fare")

In [None]:
df_parquet.cov("age", "Fare")

In [None]:
quantiles = df_parquet.approxQuantile("age", [0.25, 0.5, 0.75], 0.01)
print(f"Approximate quantiles: {quantiles}")

In [None]:
df_parquet.sampleBy("name", fractions={"a": 0.5, "b": 1.0, "c": 0.2}).show()

In [None]:
df_parquet.rollup("age").sum("Fare").show()

In [None]:
def add_years(df):
    return df.withColumn("age_plus_5", F.col("age") + 5)
    
df_parquet.transform(add_years).show()

In [None]:
data1 = [("John", 30), ("Jane", 25)]
data2 = [("USA",), ("UK",)]
df1 = spark.createDataFrame(data1, ["name", "age"])
df2 = spark.createDataFrame(data2, ["country"])

# Perform a cross join
df_cross_join = df1.crossJoin(df2)
df_cross_join.show()

In [None]:
df_parquet.createOrReplaceTempView("people")

# Run SQL queries on the temporary view
spark.sql("SELECT * FROM people WHERE age > 25").show()

In [None]:
df_parquet.selectExpr("name as full_name", "age + 5 as age_in_5_years").show()

In [None]:
df_parquet.inputFiles()

In [None]:
df_parquet.isEmpty()

In [None]:
df_parquet.isStreaming

In [None]:
df_parquet.summary().show()

In [None]:
window_spec = Window.partitionBy("sex").orderBy("Survived")

In [None]:
# Apply row_number
df_parquet.withColumn("row_number", F.row_number().over(window_spec)).where("row_number = 1").show()

# Apply rank
df_parquet.withColumn("rank", F.rank().over(window_spec)).where("rank = 1").show()

# Apply dense_rank
df_parquet.withColumn("dense_rank", F.dense_rank().over(window_spec)).where(F.col("dense_rank").isin([1, 2])).show()


In [None]:
# Filter names that contain the substring 'a'
df_parquet.filter(F.col("name").like("%a%")).show()

# Filter names that start with 'A'
df_parquet.filter(F.col("name").like("A%")).show()

# Filter names that end with 'e'
df_parquet.filter(F.col("name").like("%e")).show()

In [None]:
df_parquet.filter(F.col("name").endswith("A")).show()

df_parquet.filter(F.col("name").startswith("D")).show()

In [None]:
spark.stop()