In [1]:
import findspark
findspark.init()

import pyspark
from pyspark.sql.functions import when, col, lit, split, filter, explode, array, array_contains, count, min, max, mean, expr, udf, from_json, to_json
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, StructField, StructType, MapType, ArrayType, BooleanType

In [2]:
# create spark session
spark = SparkSession.builder.getOrCreate()

In [3]:
# create dataframe and schema
data = [(1, 'Alex', 'Morgan', 'Chelsea, England', 2000, {'gender': 'F', 'age': 23}, 1),
        (2, 'Jake', 'Jonas', 'Barcelona, Spain', 10000, {'gender': 'M', 'age': 35}, 3),
        (3, 'John', 'Fisk', 'CSKA, Russia', 1000, {'gender': 'M', 'age': 30}, 5)]


data_1 = [(4, 'Lionel', 'Messi', 'France, PSG', 50000, {'gender': 'M', 'age': 34}, 2),
          (5, 'Michael', 'Duglas', 'New York Galaxy, USA', 400, {'gender': 'M', 'age': 61}, 3),
          (6, 'Jane', 'One', 'Milan, Italy', 3000, {'gender': 'F', 'age': 21}, 1),
          (7, 'Jane', 'One', 'Milan, Italy', 3000, {'gender': 'F', 'age': 21}, 2)]

data_2 = [(1, 'IT'),
          (2, 'HR'),
          (3, 'Development'),
          (4, 'Analyitcs')]

schema = StructType([StructField(name='id', dataType=IntegerType()),
                     StructField(name='first_name', dataType=StringType()),
                     StructField(name='second_name', dataType=StringType()),
                     StructField(name='team', dataType=StringType()),
                     StructField(name='salary', dataType=IntegerType()),
                     StructField(name='gender_and_age', dataType=MapType(StringType(), StringType())),
                     StructField(name='dep_id', dataType=IntegerType())
                    ])

schema_1 = StructType([StructField(name='id', dataType=IntegerType()),
                       StructField(name='dep_name', dataType=StringType())
                     ])

df = spark.createDataFrame(data=data, schema=schema)
df_1 = spark.createDataFrame(data=data_1, schema=schema)
df_2 = spark.createDataFrame(data=data_2, schema=schema_1)

In [4]:
# data transformations
df = df.select(df.id, 
               df.first_name, 
               split('team', ',')[0].alias('footbal_club'),
               split('team', ',')[1].alias('footbal_country'), 
               df.salary,
               df.gender_and_age['gender'].alias('gender'),
               df.gender_and_age['age'].alias('age'),
               df.dep_id.alias('dep_id'))
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- footbal_club: string (nullable = true)
 |-- footbal_country: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: string (nullable = true)
 |-- dep_id: integer (nullable = true)



In [5]:
# data transformations
df_1 = df_1.select(df_1.id, 
                   df_1.first_name, 
                   split('team', ',')[0].alias('footbal_club'),
                   split('team', ',')[1].alias('footbal_country'), 
                   df_1.salary,
                   df_1.gender_and_age['gender'].alias('gender'),
                   df_1.gender_and_age['age'].alias('age'),
                   df_1.dep_id.alias('dep_id'),
                   when(df_1.gender_and_age['age'] > 30, True).otherwise(False).alias('is_high_age').cast(BooleanType()))
df_1.printSchema()

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- footbal_club: string (nullable = true)
 |-- footbal_country: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: string (nullable = true)
 |-- dep_id: integer (nullable = true)
 |-- is_high_age: boolean (nullable = false)



In [6]:
df_2.show()
df_2.printSchema()

+---+-----------+
| id|   dep_name|
+---+-----------+
|  1|         IT|
|  2|         HR|
|  3|Development|
|  4|  Analyitcs|
+---+-----------+

root
 |-- id: integer (nullable = true)
 |-- dep_name: string (nullable = true)



In [7]:
# check save and loading
df.coalesce(1).write.format('csv').mode('overwrite').options(header=True).save('test')
df_1.coalesce(1).write.format('csv').mode('overwrite').options(header=True).save('test1')

df = spark.read.format('csv').options(header=True).load('test')
df_1 = spark.read.format('csv').options(header=True).load('test1')

In [8]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- footbal_club: string (nullable = true)
 |-- footbal_country: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: string (nullable = true)
 |-- dep_id: string (nullable = true)



In [9]:
# filter
df.filter((df.age > 30) | (df.footbal_club.like('%a%'))).show()

+---+----------+------------+---------------+------+------+---+------+
| id|first_name|footbal_club|footbal_country|salary|gender|age|dep_id|
+---+----------+------------+---------------+------+------+---+------+
|  1|      Alex|     Chelsea|        England|  2000|     F| 23|     1|
|  2|      Jake|   Barcelona|          Spain| 10000|     M| 35|     3|
+---+----------+------------+---------------+------+------+---+------+



In [10]:
# drop duplicates
df.dropDuplicates(['gender']).show()

+---+----------+------------+---------------+------+------+---+------+
| id|first_name|footbal_club|footbal_country|salary|gender|age|dep_id|
+---+----------+------------+---------------+------+------+---+------+
|  1|      Alex|     Chelsea|        England|  2000|     F| 23|     1|
|  2|      Jake|   Barcelona|          Spain| 10000|     M| 35|     3|
+---+----------+------------+---------------+------+------+---+------+



In [11]:
# sorting
df.orderBy(df.gender.asc(), df.first_name.desc()).show()
df_1.sort(['gender', 'first_name'], ascending=[True, False]).show()

+---+----------+------------+---------------+------+------+---+------+
| id|first_name|footbal_club|footbal_country|salary|gender|age|dep_id|
+---+----------+------------+---------------+------+------+---+------+
|  1|      Alex|     Chelsea|        England|  2000|     F| 23|     1|
|  3|      John|        CSKA|         Russia|  1000|     M| 30|     5|
|  2|      Jake|   Barcelona|          Spain| 10000|     M| 35|     3|
+---+----------+------------+---------------+------+------+---+------+

+---+----------+---------------+---------------+------+------+---+------+-----------+
| id|first_name|   footbal_club|footbal_country|salary|gender|age|dep_id|is_high_age|
+---+----------+---------------+---------------+------+------+---+------+-----------+
|  6|      Jane|          Milan|          Italy|  3000|     F| 21|     1|      false|
|  7|      Jane|          Milan|          Italy|  3000|     F| 21|     2|      false|
|  5|   Michael|New York Galaxy|            USA|   400|     M| 61|     3

In [12]:
# union (or unionByName)
df.unionByName(other=df_1, allowMissingColumns=True).dropDuplicates().show()

+---+----------+---------------+---------------+------+------+---+------+-----------+
| id|first_name|   footbal_club|footbal_country|salary|gender|age|dep_id|is_high_age|
+---+----------+---------------+---------------+------+------+---+------+-----------+
|  4|    Lionel|         France|            PSG| 50000|     M| 34|     2|       true|
|  6|      Jane|          Milan|          Italy|  3000|     F| 21|     1|      false|
|  5|   Michael|New York Galaxy|            USA|   400|     M| 61|     3|       true|
|  7|      Jane|          Milan|          Italy|  3000|     F| 21|     2|      false|
|  2|      Jake|      Barcelona|          Spain| 10000|     M| 35|     3|       NULL|
|  3|      John|           CSKA|         Russia|  1000|     M| 30|     5|       NULL|
|  1|      Alex|        Chelsea|        England|  2000|     F| 23|     1|       NULL|
+---+----------+---------------+---------------+------+------+---+------+-----------+



In [13]:
# grouping
df.groupBy('gender').agg(count('footbal_country').alias('count_od_football_countries'), 
                         min('age').alias('minimum_age')).show()

+------+---------------------------+-----------+
|gender|count_od_football_countries|minimum_age|
+------+---------------------------+-----------+
|     F|                          1|         23|
|     M|                          2|         30|
+------+---------------------------+-----------+



In [14]:
# join
df.join(other=df_2, on=df.dep_id==df_2.id, how='leftanti').show()

+---+----------+------------+---------------+------+------+---+------+
| id|first_name|footbal_club|footbal_country|salary|gender|age|dep_id|
+---+----------+------------+---------------+------+------+---+------+
|  3|      John|        CSKA|         Russia|  1000|     M| 30|     5|
+---+----------+------------+---------------+------+------+---+------+



In [15]:
# fillna and pivot
df_3 = df.groupby('footbal_country').pivot('first_name').count().fillna(0)
df_3.show()

+---------------+----+----+----+
|footbal_country|Alex|Jake|John|
+---------------+----+----+----+
|         Russia|   0|   0|   1|
|          Spain|   0|   1|   0|
|        England|   1|   0|   0|
+---------------+----+----+----+



In [16]:
# unpivot
df_3.select(df_3.footbal_country, expr("stack(3, 'Alex', Alex, 'Jake', Jake, 'John', John) as (first_name, count)")).collect()[0]

Row(footbal_country='Russia', first_name='Alex', count=0)

In [17]:
# sample
df.sample(fraction=0.5, seed=123).show()

+---+----------+------------+---------------+------+------+---+------+
| id|first_name|footbal_club|footbal_country|salary|gender|age|dep_id|
+---+----------+------------+---------------+------+------+---+------+
|  1|      Alex|     Chelsea|        England|  2000|     F| 23|     1|
+---+----------+------------+---------------+------+------+---+------+



In [18]:
df.createOrReplaceTempView('first_table')
spark.sql(" SELECT *, LAG(footbal_club, 1) OVER(partition by gender order by id) as test FROM first_table").show()

+---+----------+------------+---------------+------+------+---+------+---------+
| id|first_name|footbal_club|footbal_country|salary|gender|age|dep_id|     test|
+---+----------+------------+---------------+------+------+---+------+---------+
|  1|      Alex|     Chelsea|        England|  2000|     F| 23|     1|     NULL|
|  2|      Jake|   Barcelona|          Spain| 10000|     M| 35|     3|     NULL|
|  3|      John|        CSKA|         Russia|  1000|     M| 30|     5|Barcelona|
+---+----------+------------+---------------+------+------+---+------+---------+



In [19]:
# udf
SalaryAge = udf(lambda x, y: x + y)
df.select(SalaryAge(df.salary.cast(IntegerType()), df.age.cast(IntegerType()))).show()

+-----------------------------------------------+
|<lambda>(cast(salary as int), cast(age as int))|
+-----------------------------------------------+
|                                           2023|
|                                          10035|
|                                           1030|
+-----------------------------------------------+



In [20]:
# rdd, map
rdd = spark.sparkContext.parallelize(data)
rdd.collect()
rdd.map(lambda x: (x[0] + x[4],)).toDF().show()

+-----+
|   _1|
+-----+
| 2001|
|10002|
| 1003|
+-----+



In [21]:
# partitionBy
df.coalesce(1).write.format('csv').mode('overwrite').partitionBy('gender').options(header=True).save('test2')
df_4 = spark.read.format('csv').options(header=True).load('test2/gender=F')

In [22]:
df_4.show()

+---+----------+------------+---------------+------+---+------+
| id|first_name|footbal_club|footbal_country|salary|age|dep_id|
+---+----------+------------+---------------+------+---+------+
|  1|      Alex|     Chelsea|        England|  2000| 23|     1|
+---+----------+------------+---------------+------+---+------+

