In [2]:
from pyspark.sql import SparkSession

sc = SparkSession.builder.getOrCreate()

24/02/26 13:27:24 WARN Utils: Your hostname, Davids-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.41 instead (on interface en0)
24/02/26 13:27:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/26 13:27:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
from pyspark.sql import Row

df1 = sc.createDataFrame([
    Row(course="dotNET", year=2012, earnings=10000),
    Row(course="Java", year=2012, earnings=20000),
    Row(course="dotNET", year=2012, earnings=5000),
    Row(course="dotNET", year=2013, earnings=48000),
    Row(course="Java", year=2013, earnings=30000),
])
df1.show()

                                                                                

+------+----+--------+
|course|year|earnings|
+------+----+--------+
|dotNET|2012|   10000|
|  Java|2012|   20000|
|dotNET|2012|    5000|
|dotNET|2013|   48000|
|  Java|2013|   30000|
+------+----+--------+



In [31]:
df1.printSchema()

root
 |-- course: string (nullable = true)
 |-- year: long (nullable = true)
 |-- earnings: long (nullable = true)



In [30]:
df1.describe().show()

[Stage 114:>                                                        (0 + 4) / 4]

+-------+------+------------------+------------------+
|summary|course|              year|          earnings|
+-------+------+------------------+------------------+
|  count|     5|                 5|                 5|
|   mean|  NULL|            2012.4|           22600.0|
| stddev|  NULL|0.5477225575051662|17140.595088852664|
|    min|  Java|              2012|              5000|
|    max|dotNET|              2013|             48000|
+-------+------+------------------+------------------+



                                                                                

In [35]:
from pyspark.sql.functions import sum

df1.filter(df1.year == 2012)\
    .select("course", "earnings")\
    .groupBy("course").agg(sum("earnings").alias("earnings"))\
    .show()

[Stage 124:>                                                        (0 + 4) / 4]

+------+--------+
|course|earnings|
+------+--------+
|dotNET|   15000|
|  Java|   20000|
+------+--------+



                                                                                

In [6]:
from pyspark.sql.functions import col

df1.groupBy("year").pivot("course").sum("earnings")\
    .withColumn("Total", col("Java") + col("dotNET"))\
    .show()



+----+-----+------+-----+
|year| Java|dotNET|Total|
+----+-----+------+-----+
|2012|20000| 15000|35000|
|2013|30000| 48000|78000|
+----+-----+------+-----+



                                                                                

In [14]:
from pyspark.sql.functions import avg, sum

df1.groupBy("course")\
    .agg(avg("earnings").alias("average_earnings"), sum("earnings").alias("total_earnings"))\
    .show()



+------+----------------+--------------+
|course|average_earnings|total_earnings|
+------+----------------+--------------+
|dotNET|         21000.0|         63000|
|  Java|         25000.0|         50000|
+------+----------------+--------------+



                                                                                

In [23]:
from pyspark.sql.window import Window
from pyspark.sql.functions import sum

window_spec = Window().partitionBy("course").orderBy("year")
df2 = df1.groupBy("course", "year").agg(sum("earnings").alias("earnings"))\
    .withColumn("culmul_earning", sum("earnings").over(window_spec))
df2.show()



+------+----+--------+--------------+
|course|year|earnings|culmul_earning|
+------+----+--------+--------------+
|  Java|2012|   20000|         20000|
|  Java|2013|   30000|         50000|
|dotNET|2012|   15000|         15000|
|dotNET|2013|   48000|         63000|
+------+----+--------+--------------+



                                                                                

In [24]:
from pyspark.sql.functions import when

earnings_tier = when(col("earnings") < 10000, "low")\
    .when(col("earnings") < 30000, "mid")\
    .otherwise("high")

df3 = df1.groupBy("course", "year").agg(sum("earnings").alias("earnings"))\
    .withColumn("earnings_range", earnings_tier)\
    .orderBy("course", "year")
df3.show()

+------+----+--------+--------------+
|course|year|earnings|earnings_range|
+------+----+--------+--------------+
|  Java|2012|   20000|           mid|
|  Java|2013|   30000|          high|
|dotNET|2012|   15000|           mid|
|dotNET|2013|   48000|          high|
+------+----+--------+--------------+



In [28]:
df4 = df3.join(df2, on=["course", "year"], how="left")\
    .sort("course", "year")\
    .drop(df2.earnings)
df4.show()

                                                                                

+------+----+--------+--------------+--------------+
|course|year|earnings|earnings_range|culmul_earning|
+------+----+--------+--------------+--------------+
|  Java|2012|   20000|           mid|         20000|
|  Java|2013|   30000|          high|         50000|
|dotNET|2012|   15000|           mid|         15000|
|dotNET|2013|   48000|          high|         63000|
+------+----+--------+--------------+--------------+



In [37]:
df4.write.partitionBy("year").mode("overwrite").parquet("my.parquet")

                                                                                

In [7]:
from datetime import date, datetime

from pyspark.sql.types import (
    StructType,
    IntegerType,
    FloatType,
    DateType,
    TimestampType,
    StructField,
    StringType,
)

schema = StructType(
    [
        StructField("a", IntegerType(), True),
        StructField("b", FloatType(), True),
        StructField("c", StringType(), True),
        StructField("d", DateType(), True),
        StructField("e", TimestampType(), True),
    ]
)
df = sc.createDataFrame(
    [
        (1, 2.0, "string1", date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
        (2, 3.0, "string1", date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
        (3, 3.0, "string2", date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
        (4, 4.0, "string3", date(2000, 3, 1), datetime(2000, 1, 3, 12, 0)),
    ],
    schema=schema,
)

df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string1|2000-02-01|2000-01-02 12:00:00|
|  3|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  4|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [None]:
dz = sc.read.parquet("my.parquet")