In [2]:
# Localhost
# http://localhost:4040/ 

# Packages
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

import os
import sys
import pyspark
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, lit, col
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import rank
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.types import StructType,StructField 
from pyspark.sql.types import StringType, IntegerType, ArrayType, DateType
from pyspark.sql.types import StringType, ArrayType,StructType,StructField
from pyspark.sql.functions import regexp_replace

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

# Spark Notebook

In [44]:
# Data

# F.concat()
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)]
columns = ["firstname","middlename","lastname","dob","gender","salary"]
df_concat = spark.createDataFrame(data=data, schema = columns)


# F.explode()
spark = SparkSession.builder.appName('pyspark-by-examples').getOrCreate()
arrayData = [
        ('James',['Java','Scala'],{'hair':'black','eye':'brown'}),
        ('Michael',['Spark','Java',None],{'hair':'brown','eye':None}),
        ('Robert',['CSharp',''],{'hair':'red','eye':''}),
        ('Washington',None,None),
        ('Jefferson',['1','2'],{})]
df_explode = spark.createDataFrame(data=arrayData, schema = ['name','knownLanguages','properties'])


# when()
data = [("James","M",60000),("Michael","M",70000),
        ("Robert",None,400000),("Maria","F",500000),
        ("Jen","",None)]
columns = ["name","gender","salary"]
df_when = spark.createDataFrame(data = data, schema = columns)


# split()
data=data = [('James','','Smith','1991-04-01'),
  ('Michael','Rose','','2000-05-19'),
  ('Robert','','Williams','1978-09-05'),
  ('Maria','Anne','Jones','1967-12-01'),
  ('Jen','Mary','Brown','1980-02-17')]
columns=["firstname","middlename","lastname","dob"]
df_split = spark.createDataFrame(data,columns)


# rank()
simpleData = (("James", "Sales", 3000), \
    ("Michael", "Sales", 4600),  \
    ("Robert", "Sales", 4100),   \
    ("Maria", "Finance", 3000),  \
    ("James", "Sales", 3000),    \
    ("Scott", "Finance", 3300),  \
    ("Jen", "Finance", 3900),    \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Saif", "Sales", 4100))
columns= ["employee_name", "department", "salary"]
df_rank = spark.createDataFrame(data = simpleData, schema = columns)


# withColumn()
data = [('James','','Smith','2020-01-01','M',3000),
  ('Michael','Rose','','2021-01-01','M',4000),
  ('Robert','','Williams','2021-01-01','M',4000),
  ('Maria','Anne','Jones','2020-01-01','F',4000),
  ('Jen','Mary','Brown','1982-01-27','F',-1)]
columns = ["firstname","middlename","lastname","dob","gender","salary"]
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
df_wc = spark.createDataFrame(data=data, schema = columns)


# collect_list()
simpleData = [("James", "Sales", 3000),
    ("Michael", "Sales", 4600),
    ("Robert", "Sales", 4100),
    ("Maria", "Finance", 3000),
    ("James", "Sales", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100)]
schema = ["employee_name", "department", "salary"]
df_collectlist = spark.createDataFrame(data=simpleData, schema = schema)


# df_filter
data = [
    (("James","","Smith"),["Java","Scala","C++"],"OH","M","Male"),
    (("Anna","Rose",""),["Spark","Java","C++"],"NY","F","Female"),
    (("Julia","","Williams"),["CSharp","VB"],"OH","F","Male"),
    (("Maria","Anne","Jones"),["CSharp","VB"],"NY","M","Female"),
    (("Jen","Mary","Brown"),["CSharp","VB"],"NY","M","Male"),
    (("Mike","Mary","Williams"),["Python","VB"],"OH","M","Female")
 ]
        
schema = StructType([
     StructField('name', StructType([
        StructField('firstname', StringType(), True),
        StructField('middlename', StringType(), True),
         StructField('lastname', StringType(), True)
     ])),
     StructField('languages', ArrayType(StringType()), True),
     StructField('state', StringType(), True),
     StructField('gender', StringType(), True),
     StructField('gender2', StringType(), True)
 ])

df_filter = spark.createDataFrame(data = data, schema = schema)


# array()
data = [
 ("James,,Smith",["Java","Scala","C++"],["Spark","Java"],"OH","CA"),
 ("Michael,Rose,",["Spark","Java","C++"],["Spark","Java"],"NY","NJ"),
 ("Robert,,Williams",["CSharp","VB"],["Spark","Python"],"UT","NV")]
schema = StructType([ 
    StructField("name",StringType(),True), 
    StructField("languagesAtSchool",ArrayType(StringType()),True), 
    StructField("languagesAtWork",ArrayType(StringType()),True), 
    StructField("currentState", StringType(), True), 
    StructField("previousState", StringType(), True)])

df_array = spark.createDataFrame(data=data,schema=schema)




# alert()
data = [('101', "MKD", "2021-01-01 00:00:00.0"),
        ('102', "MKD", "2021-01-01 00:00:00.0"),
        ('103', "MKD", "2020-01-01 00:00:00.0"),
        ('104', "MKD", "2020-01-01 00:00:00.0"),
        ('105', "MKD", "2019-01-01 00:00:00.0"),
        ('106', "MKD", "2019-01-01 00:00:00.0"),
        ('201', "OMA", "2021-01-01 00:00:00.0"),
        ('202', "OMA", "2021-01-01 00:00:00.0"),
        ('203', "OMA", "2020-01-01 00:00:00.0"),
        ('204', "OMA", "2020-01-01 00:00:00.0"),
        ('205', "OMA", "2019-01-01 00:00:00.0"),
        ('206', "OMA", "2019-01-01 00:00:00.0"),
        ('301', "ICE", "2021-01-01 00:00:00.0"),
        ('302', "ICE", "2021-01-01 00:00:00.0"),
        ('303', "ICE", "2020-01-01 00:00:00.0"),
        ('304', "ICE", "2020-01-01 00:00:00.0"),
        ('305', "ICE", "2019-01-01 00:00:00.0"),
        ('306', "ICE", "2019-01-01 00:00:00.0"),
        ('401', "CH",  "2021-01-01 00:00:00.0"),
        ('402', "CH",  "2021-01-01 00:00:00.0"),
        ('403', "CH",  "2020-01-01 00:00:00.0"),
        ('404', "CH",  "2020-01-01 00:00:00.0"),
        ('405', "CH",  "2019-01-01 00:00:00.0"),
        ('406', "CH",  "2019-01-01 00:00:00.0"),
        ('501', "PRT", "2021-01-01 00:00:00.0"),
        ('502', "PRT", "2021-01-01 00:00:00.0"),
        ('503', "PRT", "2020-01-01 00:00:00.0"),
        ('504', "PRT", "2020-01-01 00:00:00.0"),
        ('505', "PRT", "2019-01-01 00:00:00.0"),
        ('506', "PRT", "2019-01-01 00:00:00.0"),
]
columns = ["ID","COUNTRY","DATE",]
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
df_alert = spark.createDataFrame(data=data, schema = columns)

In [3]:
#Window.Window
#Types.StringType
#Transforms.api : transform_df, Input, Output

In [50]:
df_alert.show()
df = df_alert
df = df.withColumn("DATE", df.DATE.cast(DateType()))

max_date = df.agg({"DATE":"max"}).collect()[0][0]
df = df.filter(F.col("DATE") == F.lit(max_date))

column_list = ["ID"]
w = Window.partitionBy([col(x) for x in column_list]).orderBy("ID")
df = df.withColumn("NR_ALERTS", F.count("ID").over(w))
df = df.withColumn("NUMBER", F.row_number().over(w))
df = df.withColumn("alert_id", F.concat(df.ID, F.lit("_"), df.NUMBER))

df = df.toDF(*[c.upper() for c in df.columns])

df.show()

+---+-------+--------------------+
| ID|COUNTRY|                DATE|
+---+-------+--------------------+
|101|    MKD|2021-01-01 00:00:...|
|101|    MKD|2021-01-01 00:00:...|
|101|    MKD|2020-01-01 00:00:...|
|101|    MKD|2020-01-01 00:00:...|
|101|    MKD|2019-01-01 00:00:...|
|101|    MKD|2019-01-01 00:00:...|
|201|    OMA|2021-01-01 00:00:...|
|202|    OMA|2021-01-01 00:00:...|
|203|    OMA|2020-01-01 00:00:...|
|204|    OMA|2020-01-01 00:00:...|
|205|    OMA|2019-01-01 00:00:...|
|206|    OMA|2019-01-01 00:00:...|
|301|    ICE|2021-01-01 00:00:...|
|302|    ICE|2021-01-01 00:00:...|
|303|    ICE|2020-01-01 00:00:...|
|304|    ICE|2020-01-01 00:00:...|
|305|    ICE|2019-01-01 00:00:...|
|306|    ICE|2019-01-01 00:00:...|
|401|     CH|2021-01-01 00:00:...|
|402|     CH|2021-01-01 00:00:...|
+---+-------+--------------------+
only showing top 20 rows

+---+-------+----------+---------+------+--------+
| ID|COUNTRY|      DATE|NR_ALERTS|NUMBER|ALERT_ID|
+---+-------+----------+--------

# Functions Package

In [41]:
# F.row_number()
# Window function: returns a sequential number starting at 1 within a window partition.
# If inside a window partion there are records with the same values, then they number like 1,2,3,...
# If two unique records do not exist, then it number like 1,1,1,...
df = df_alert
df = df.withColumn("DATE", df.DATE.cast(DateType()))

column_list = ["ID"]
w = Window.partitionBy([col(x) for x in column_list]).orderBy("ID")
df = df.withColumn("NUMBER", F.row_number().over(w))

df.show()

+---+-------+----------+------+
| ID|COUNTRY|      DATE|NUMBER|
+---+-------+----------+------+
|205|    OMA|2019-01-01|     1|
|101|    MKD|2021-01-01|     1|
|101|    OMA|2021-01-01|     2|
|101|    ICE|2021-01-01|     3|
|406|     CH|2019-01-01|     1|
|203|    OMA|2020-01-01|     1|
|202|    OMA|2021-01-01|     1|
|503|    PRT|2020-01-01|     1|
|401|     CH|2021-01-01|     1|
|206|    OMA|2019-01-01|     1|
|302|    ICE|2021-01-01|     1|
|502|    PRT|2021-01-01|     1|
|505|    PRT|2019-01-01|     1|
|501|    PRT|2021-01-01|     1|
|104|    MKD|2020-01-01|     1|
|404|     CH|2020-01-01|     1|
|102|    MKD|2021-01-01|     1|
|403|     CH|2020-01-01|     1|
|103|    MKD|2020-01-01|     1|
|506|    PRT|2019-01-01|     1|
+---+-------+----------+------+
only showing top 20 rows



In [4]:
# F.lit()
# lit() and typedLit() are used to add a new column to DataFrame by assigning a literal or constant value. Both these functions return Column type as return type. 
data = [("111", 50000),("222", 60000),("333", 40000)]
columns= ["EmpId", "Salary"]
df = spark.createDataFrame(data = data, schema = columns)

# Example 1
df2 = df.select(col("EmpId"), col("Salary"), lit("1").alias("lit_value1"))
df2.show(truncate=False)

dfx = df.withColumn("new_column", col("Salary"))
dfx.show(truncate=False)

# Example 2
#df3 = df2.withColumn("lit_value2", when(col("Salary") >=40000 & col("Salary") <= 50000, lit("100")).otherwise(lit("200")))
#df3.show(truncate=False)


+-----+------+----------+
|EmpId|Salary|lit_value1|
+-----+------+----------+
|111  |50000 |1         |
|222  |60000 |1         |
|333  |40000 |1         |
+-----+------+----------+

+-----+------+----------+
|EmpId|Salary|new_column|
+-----+------+----------+
|111  |50000 |50000     |
|222  |60000 |60000     |
|333  |40000 |40000     |
+-----+------+----------+



In [5]:
# F.concat()
# To concatenate (combine) multiple columns into a single column
df_concat.show()
df2 = df_concat.select(F.concat(df_concat.firstname, df_concat.middlename, df_concat.lastname).alias("FullName"),"dob","gender","salary")
df2.show(truncate=False)


+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+

+--------------+----------+------+------+
|FullName      |dob       |gender|salary|
+--------------+----------+------+------+
|JamesSmith    |1991-04-01|M     |3000  |
|MichaelRose   |2000-05-19|M     |4000  |
|RobertWilliams|1978-09-05|M     |4000  |
|MariaAnneJones|1967-12-01|F     |4000  |
|JenMaryBrown  |1980-02-17|F     |-1    |
+--------------+----------+------+------+



In [14]:
# F.explode(col)
# Returns a new row for each element in the given array or map.
# When an array is passed to this function, it creates a new default column “col1” and it contains all array elements. 
# When a map is passed, it creates two new columns one for key and one for value and each element in map split into the rows.
# F.explode will ignore elements that have null or empty, if there are nulls use explode_outer()

# Show the initial dataframe
df_explode.printSchema()
df_explode.show()


# F.explode() – array column example, this ignores nulls so be carefull when you use it
df2 = df_explode.select(df_explode.name, explode(df_explode.knownLanguages))
df2.printSchema()
df2.show()


# F.explode() – map column example, this ignores nulls so be carefull when you use it
df3 = df_explode.select(df_explode.name, explode(df_explode.properties))
df3.printSchema()
df3.show()


# F.explode_outer()
# Create rows for each element in an array or map. For na values it creates null
print("F.explode_outer() with array")
df_explode.select(df_explode.name, F.explode_outer(df_explode.knownLanguages)).show()
""" with map """
df_explode.select(df_explode.name, F.explode_outer(df_explode.properties)).show()


# F.posexplode()
# creates a row for each element in the array and creates two columns “pos’ to hold the position of the array element and the ‘col’ to hold the actual array value.
# This will ignore elements that have null or empty.
""" with array """
df_explode.select(df_explode.name, F.posexplode(df_explode.knownLanguages)).show()
""" with map """
df_explode.select(df_explode.name, F.posexplode(df_explode.properties)).show()


# F.posexplode_outer()
# creates a row for each element in the array and creates two columns “pos’ to hold the position of the array element and the ‘col’ to hold the actual array value.
# Unlike posexplode, if the array or map is null or empty, posexplode_outer function returns null, null for pos and col columns.
""" with array """
#df_explode.select($"name", F.posexplode_outer($"knownLanguages")).show()
""" with map """
df_explode.select(df_explode.name, F.posexplode_outer(df_explode.properties)).show()


root
 |-- name: string (nullable = true)
 |-- knownLanguages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+----------+-------------------+--------------------+
|      name|     knownLanguages|          properties|
+----------+-------------------+--------------------+
|     James|      [Java, Scala]|{eye -> brown, ha...|
|   Michael|[Spark, Java, null]|{eye -> null, hai...|
|    Robert|         [CSharp, ]|{eye -> , hair ->...|
|Washington|               null|                null|
| Jefferson|             [1, 2]|                  {}|
+----------+-------------------+--------------------+

root
 |-- name: string (nullable = true)
 |-- col: string (nullable = true)

+---------+------+
|     name|   col|
+---------+------+
|    James|  Java|
|    James| Scala|
|  Michael| Spark|
|  Michael|  Java|
|  Michael|  null|
|   Robert|CSharp|
|   Robert|      |

In [7]:
# F.when()
# Evaluates a list of conditions and returns one of multiple possible result expressions.
# when(condition, value)
df_when.show()
df2 = df_when.withColumn("new_gender", when(df_when.gender == "M","Male")
                                 .when(df_when.gender == "F","Female")
                                 .when(df_when.gender.isNull() ,"")
                                 .otherwise(df_when.gender))
df2.show()

+-------+------+------+
|   name|gender|salary|
+-------+------+------+
|  James|     M| 60000|
|Michael|     M| 70000|
| Robert|  null|400000|
|  Maria|     F|500000|
|    Jen|      |  null|
+-------+------+------+

+-------+------+------+----------+
|   name|gender|salary|new_gender|
+-------+------+------+----------+
|  James|     M| 60000|      Male|
|Michael|     M| 70000|      Male|
| Robert|  null|400000|          |
|  Maria|     F|500000|    Female|
|    Jen|      |  null|          |
+-------+------+------+----------+



In [8]:
# F.regexp_replace()
# WORKS ON: StringType
# regexp_replace(str, pattern, replacement)
# Replace all substrings of the specified string value that match regexp with rep.


address = [(1,"14851 Jeffrey Rd","DE"),
    (2,"43421 Margarita St","NY"),
    (3,"13111 Siemon Ave","CA")]
df = spark.createDataFrame(address,["id","address","state"])
df.show()

df.withColumn('address', F.regexp_replace('address', 'Rd', 'Road')).show()


+---+------------------+-----+
| id|           address|state|
+---+------------------+-----+
|  1|  14851 Jeffrey Rd|   DE|
|  2|43421 Margarita St|   NY|
|  3|  13111 Siemon Ave|   CA|
+---+------------------+-----+

+---+------------------+-----+
| id|           address|state|
+---+------------------+-----+
|  1|14851 Jeffrey Road|   DE|
|  2|43421 Margarita St|   NY|
|  3|  13111 Siemon Ave|   CA|
+---+------------------+-----+



In [12]:
# F.split()
# split(str, pattern[, limit])
# Splits a String (!!!) around matches of the given pattern.
# If you want to split a date (01-01-1001), then you need to transform it to a string first
# Alternatives exist how to split, make sure to check them out

df_split.printSchema()
df_split.show()
df1 = df_split.withColumn('year', split(df_split['dob'], '-').getItem(0)) \
       .withColumn('month', split(df_split['dob'], '-').getItem(1)) \
       .withColumn('day', split(df_split['dob'], '-').getItem(2))
df1.show(truncate=False)

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)

+---------+----------+--------+----------+
|firstname|middlename|lastname|       dob|
+---------+----------+--------+----------+
|    James|          |   Smith|1991-04-01|
|  Michael|      Rose|        |2000-05-19|
|   Robert|          |Williams|1978-09-05|
|    Maria|      Anne|   Jones|1967-12-01|
|      Jen|      Mary|   Brown|1980-02-17|
+---------+----------+--------+----------+

+---------+----------+--------+----------+----+-----+---+
|firstname|middlename|lastname|dob       |year|month|day|
+---------+----------+--------+----------+----+-----+---+
|James    |          |Smith   |1991-04-01|1991|04   |01 |
|Michael  |Rose      |        |2000-05-19|2000|05   |19 |
|Robert   |          |Williams|1978-09-05|1978|09   |05 |
|Maria    |Anne      |Jones   |1967-12-01|1967|12   |01 |
|Jen      |Mary      |Brown   |1980-02-17|

In [13]:

# F.rank()
# Window function: returns the rank of rows within a window partition.
df_rank.show(truncate=False)
windowSpec  = Window.partitionBy("department").orderBy("salary")
df_rank.withColumn("rank", F.rank().over(windowSpec)).show()
df_rank.show(truncate=False)


+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|        James|     Sales|  3000|   1|
|        James|     Sales|  3000|   1|
|       Robert|     Sales|  4100|   3|
|         Saif|     Sales|  4100|   3|
|      Michael|     Sales|  4600|   5|
|        Maria|   Finance|  3000|   1|
|        Scott|   Finance|  3300|   2|
|          Jen|   Finance|  3900|   3|
|        Kumar| Marketing|  2000|   1|
|         Jeff| Marketing|  3000|   2|
+-------------+-

In [11]:
# F.broadcast(df)
"""
- https://sparkbyexamples.com/pyspark/pyspark-broadcast-variables/
- Marks a DataFrame as small enough for use in broadcast joins.
- In PySpark RDD and DataFrame, Broadcast variables are read-only shared variables that are cached and available on all nodes in a cluster in-order to access or use by the tasks. Instead of sending this data along with every task, PySpark distributes broadcast variables to the workers using efficient broadcast algorithms to reduce communication costs.
- Broadcast variables are used in the same way for RDD, DataFrame.
- When you run a PySpark RDD, DataFrame applications that have the Broadcast variables defined and used, PySpark does the following.
    - PySpark breaks the job into stages that have distributed shuffling and actions are executed with in the stage.
    - Later Stages are also broken into tasks
    - Spark broadcasts the common data (reusable) needed by tasks within each stage.
    - The broadcasted data is cache in serialized format and deserialized before executing each task.
- When to use: You should be creating and using broadcast variables for data that shared across multiple stages and tasks. 
"""

### How to use
# The PySpark Broadcast is created using the broadcast(v) method of the SparkContext class.

# PySpark RDD Broadcast variable example
states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)

data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]

rdd = spark.sparkContext.parallelize(data)

def state_convert(code):
    return broadcastStates.value[code]

result = rdd.map(lambda x: (x[0],x[1],x[2],state_convert(x[3]))).collect()
print(result)



# PySpark DataFrame Broadcast variable example
states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)

data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")]

columns = ["firstname","lastname","country","state"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

def state_convert(code):
    return broadcastStates.value[code]

result = df.rdd.map(lambda x: (x[0],x[1],x[2],state_convert(x[3]))).toDF(columns)
result.show(truncate=False)

[('James', 'Smith', 'USA', 'California'), ('Michael', 'Rose', 'USA', 'New York'), ('Robert', 'Williams', 'USA', 'California'), ('Maria', 'Jones', 'USA', 'Florida')]
root
 |-- firstname: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|James    |Smith   |USA    |CA   |
|Michael  |Rose    |USA    |NY   |
|Robert   |Williams|USA    |CA   |
|Maria    |Jones   |USA    |FL   |
+---------+--------+-------+-----+

+---------+--------+-------+----------+
|firstname|lastname|country|state     |
+---------+--------+-------+----------+
|James    |Smith   |USA    |California|
|Michael  |Rose    |USA    |New York  |
|Robert   |Williams|USA    |California|
|Maria    |Jones   |USA    |Florida   |
+---------+--------+-------+----------+



In [6]:
# F.array()
# Creates a new array column
# Use array() function to create a new array column by merging the data from multiple columns. All input columns must have the same data type.
# df = df.withColumn("Parch_New", F.when(F.col("Parch")==0, F.array(F.lit("666"), F.lit("777"))))
# The value in the column goes from "666" -> "[666]" OR "[666,777]"
df_array.show()

df_array.select(df_array.name, F.array(df_array.currentState, df_array.previousState).alias("States")).show()


+----------------+------------------+---------------+------------+-------------+--------+
|            name| languagesAtSchool|languagesAtWork|currentState|previousState|     NEW|
+----------------+------------------+---------------+------------+-------------+--------+
|    James,,Smith|[Java, Scala, C++]|  [Spark, Java]|          OH|           CA|[OH, CA]|
|   Michael,Rose,|[Spark, Java, C++]|  [Spark, Java]|          NY|           NJ|[NY, NJ]|
|Robert,,Williams|      [CSharp, VB]|[Spark, Python]|          UT|           NV|[UT, NV]|
+----------------+------------------+---------------+------------+-------------+--------+

+----------------+--------+
|            name|  States|
+----------------+--------+
|    James,,Smith|[OH, CA]|
|   Michael,Rose,|[NY, NJ]|
|Robert,,Williams|[UT, NV]|
+----------------+--------+



# DataFrame API

In [13]:
# DF.repartition()
# DataFrame.repartition(numPartitions, *cols)
# Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.
# repartition() and coalesce() are very expensive operations as they shuffle the data across many partitions 
# repartition() is used to increase or decrease the RDD/DataFrame partitions
??? -> Not sure about the code

Object `? -> Not sure about the code` not found.


In [14]:
# df.withColumnRenamed()
# Returns a new DataFrame by renaming an existing column
# DataFrame.withColumnRenamed(existing, new)
df1 = df.withColumnRenamed("dob","DateOfBirth").printSchema()
df2 = df.withColumnRenamed("dob","DateOfBirth").withColumnRenamed("salary","salary_amount")

root
 |-- firstname: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)



In [15]:
# df.withColumn()
# DataFrame.withColumn(colName, col)
# A transformation function of DF which used to change the value, convert the datatype of an existing column, create a new column, and many more.

df.show()

# 1. Change DataType using PySpark withColumn()
df_wc.withColumn("salary", col("salary").cast("Integer")).show()

# 2. Update The Value of an Existing Column
df_wc.withColumn("salary", col("salary")*100).show()

# 3. Create a Column from an Existing
df_wc.withColumn("CopiedColumn", col("salary")* -1).show()

# 4. Add a New Column using withColumn()
df_wc.withColumn("Country", lit("USA")).show()
df_wc.withColumn("Country", lit("USA")) \
  .withColumn("anotherColumn",lit("anotherValue")) \
  .show()

# 5. Rename Column Name
df_wc.withColumnRenamed("gender","sex").show(truncate=False)


+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|    James|   Smith|    USA|   CA|
|  Michael|    Rose|    USA|   NY|
|   Robert|Williams|    USA|   CA|
|    Maria|   Jones|    USA|   FL|
+---------+--------+-------+-----+

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|199

In [19]:
# DataFrame.schema()
# Returns the schema of this DataFrame as a pyspark.sql.types.StructType
df_wc_two.show()
df_wc_two.printSchema()
maxdate

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|2021-01-01|     M|  3000|
|  Michael|      Rose|        |2021-01-01|     M|  4000|
|   Robert|          |Williams|2021-01-01|     M|  4000|
|    Maria|      Anne|   Jones|2021-01-01|     F|  4000|
|      Jen|      Mary|   Brown|2021-01-01|     F|    -1|
+---------+----------+--------+----------+------+------+

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: date (nullable = false)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



datetime.date(2021, 1, 1)

In [28]:
# DataFrame.agg(*exprs)
# Aggregate on the entire DataFrame without groups (shorthand for df.groupBy().agg())

# Example
# Get the most recent date/file/record a dataset contains
df_wc_two = df_wc.withColumn("dob", df_wc.dob.cast(DateType()))
maxdate = df_wc_two.agg({"dob": "max"}).collect()[0][0]
df_wc_two.filter(F.col("dob")==maxdate).show()



+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|  Michael|      Rose|        |2021-01-01|     M|  4000|
|   Robert|          |Williams|2021-01-01|     M|  4000|
+---------+----------+--------+----------+------+------+



In [18]:
# DataFrame.collect()
# Returns all the records as a list of Row.
# PySpark RDD/DataFrame collect() is an action operation that is used to retrieve all the elements of the dataset (from all nodes) to the driver node. We should use the collect() on smaller dataset usually after filter(), group() e.t.c. Retrieving larger datasets results in OutOfMemory error

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.show(truncate=False)

# deptDF.collect() retrieves all elements in a DataFrame as an Array of Row type to the driver node. printing a resultant array yields the below output.
# Note that collect() is an action hence it does not return a DataFrame instead, it returns data in an Array to the driver.
dataCollect = deptDF.collect()
print(dataCollect)


# When to avoid collect
# Usually, collect() is used to retrieve the action output when you have very small result set and calling collect() on an RDD/DataFrame with a bigger result set causes out of memory as it returns the entire dataset (from all workers) to the driver hence we should avoid calling collect() on a larger dataset.

# collect () vs select ()
# select() is a transformation that returns a new DataFrame and holds the columns that are selected whereas collect() is an action that returns the entire data set in an Array to the driver.

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+

[Row(dept_name='Finance', dept_id=10), Row(dept_name='Marketing', dept_id=20), Row(dept_name='Sales', dept_id=30), Row(dept_name='IT', dept_id=40)]


In [19]:
# F.collect_list()
# Returns all values from an input column with duplicates.
df_collectlist.show(truncate=False)
df_collectlist.select(F.collect_list("salary")).show(truncate=False)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+

+------------------------------------------------------------+
|collect_list(salary)                                        |
+------------------------------------------------------------+
|[3000, 4600, 4100, 3000, 3000, 3300, 3900, 3000, 2000, 4100]|
+------------------------------------------------------------+



In [21]:
# collect_set
# Returns all values from an input column with duplicate values eliminated. 

df_collectlist.select(F.collect_set("employee_name")).show(truncate=False)

+--------------------------------------------------------------+
|collect_set(employee_name)                                    |
+--------------------------------------------------------------+
|[Robert, Kumar, Jeff, Maria, Scott, Michael, Saif, Jen, James]|
+--------------------------------------------------------------+



In [22]:
# DataFrame.filter(condition)
# https://sparkbyexamples.com/pyspark/pyspark-where-filter/
# Filters rows using the given condition
# Use this to subset a dataset
df_filter.show(truncate=False)


# Both of these produce the same results
df_filter.filter(df_filter.state == "OH").show(truncate=False)
df_filter.filter(F.col("state") == "OH").show(truncate=False) 

# Multiple Conditions
df_filter.filter( (df_filter.state  == "OH") & (df_filter.gender  == "M") ).show(truncate=False) 

#Filter IS IN List values
li=["OH","CA","DE"]
df_filter.filter(df_filter.state.isin(li)).show()

# Filter NOT IS IN List values
#These show all records with NY (NY is not part of the list)
df_filter.filter(~df_filter.state.isin(li)).show()
df_filter.filter(df_filter.state.isin(li)==False).show()

# Using startswith
df_filter.filter(df_filter.state.startswith("N")).show()

#using endswith
df_filter.filter(df_filter.state.endswith("H")).show()

#contains
df_filter.filter(df_filter.state.contains("H")).show()

# like - SQL LIKE pattern
#df_filter.filter(df_filter.gender2.like("%ale%")).show()
df_filter.filter(df_filter.name.firstname.like("%ia%")).show()

# Filter on Array collumn
from pyspark.sql.functions import array_contains
df_filter.filter(array_contains(df_filter.languages,"Java")).show(truncate=False) 

# Filter on nested Struct condition
df_filter.filter(df_filter.name.lastname == "Williams").show(truncate=False) 

+----------------------+------------------+-----+------+-------+
|name                  |languages         |state|gender|gender2|
+----------------------+------------------+-----+------+-------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |Male   |
|{Anna, Rose, }        |[Spark, Java, C++]|NY   |F     |Female |
|{Julia, , Williams}   |[CSharp, VB]      |OH   |F     |Male   |
|{Maria, Anne, Jones}  |[CSharp, VB]      |NY   |M     |Female |
|{Jen, Mary, Brown}    |[CSharp, VB]      |NY   |M     |Male   |
|{Mike, Mary, Williams}|[Python, VB]      |OH   |M     |Female |
+----------------------+------------------+-----+------+-------+

+----------------------+------------------+-----+------+-------+
|name                  |languages         |state|gender|gender2|
+----------------------+------------------+-----+------+-------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |Male   |
|{Julia, , Williams}   |[CSharp, VB]      |OH   |F     |Male   |
|{Mike, Mary, Williams}|

In [23]:
# DataFrame.toDF(*cols)
# Returns a new DataFrame that with new specified column names
# https://sparkbyexamples.com/pyspark/different-ways-to-create-dataframe-in-pyspark/

In [24]:
# DataFrame.select(*cols)
# Projects a set of expressions and returns a new DataFrame

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")]
columns = ["firstname","lastname","country","state"]
df = spark.createDataFrame(data = data, schema = columns)
df.show(truncate=False)

# Selecting single and multiple columns
df.select("firstname","lastname").show()
df.select(df.firstname,df.lastname).show()
df.select(df["firstname"],df["lastname"]).show()
#By using col() function
df.select(col("firstname"),col("lastname")).show()
#Select columns by regular expression
df.select(df.colRegex("`^.*name*`")).show()


# Select all columns
# Select All columns from List
df.select(*columns).show()
# Select All columns
df.select([col for col in df.columns]).show()
df.select("*").show()


+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|James    |Smith   |USA    |CA   |
|Michael  |Rose    |USA    |NY   |
|Robert   |Williams|USA    |CA   |
|Maria    |Jones   |USA    |FL   |
+---------+--------+-------+-----+

+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|    Rose|
|   Robert|Williams|
|    Maria|   Jones|
+---------+--------+

+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|    Rose|
|   Robert|Williams|
|    Maria|   Jones|
+---------+--------+

+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|    Rose|
|   Robert|Williams|
|    Maria|   Jones|
+---------+--------+

+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|    Rose|
|   Robert|Williams|
|    Maria|   Jones|
+---------+--------+

+---------+--------+
|firstname|lastname|
+

In [25]:
# DF.drop()
# Drop Column From PySpark DataFrame
df.drop("salary").show()

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|    James|   Smith|    USA|   CA|
|  Michael|    Rose|    USA|   NY|
|   Robert|Williams|    USA|   CA|
|    Maria|   Jones|    USA|   FL|
+---------+--------+-------+-----+



In [None]:
def filterfunction():
    for i in range(20):
        print("Bobi e naipametan i naiubav i naidobar ujko!!!")

filterfunction()

# Combining tables

In [26]:
# DF.union()
# Return a new DataFrame containing union of rows in this and another DataFrame.
df1.union(df2)

NameError: name 'DataFrame' is not defined

In [None]:
# DF.join(DF)

# One condition
df1.join(df2, )
# Two conditions
df1.join(df2, ((df1.col1==df2.col1) & (df1.col8==df2.col8)), "left")




# Column API

In [None]:
# F.col()
# Returns a Column based on the given column name.
df.select(F.col("name")).show()


In [None]:
# Column.asc()
# Returns a sort expression based on ascending order of the column.


In [27]:
# Column.when(condition, value)
# Evaluates a list of conditions and returns one of multiple possible result expressions.
data = [("James","M",100),
        ("Michael","M",400),
        ("Robert",None,500),
        ("Maria","F",800),
        ("Jen","",None)]

columns = ["name","gender","salary"]
df = spark.createDataFrame(data = data, schema = columns)
df.show()

df2 = df.withColumn("new_gender", when(df.gender == "M","Male"))

+-------+------+------+
|   name|gender|salary|
+-------+------+------+
|  James|     M|   100|
|Michael|     M|   400|
| Robert|  null|   500|
|  Maria|     F|   800|
|    Jen|      |  null|
+-------+------+------+



In [28]:
# Column.otherwise(value)
# Evaluates a list of conditions and returns one of multiple possible result expressions.
df = df.withColumn("new_gender", when(df.gender == "M","Male").otherwise(df.gender))
df.show()

+-------+------+------+----------+
|   name|gender|salary|new_gender|
+-------+------+------+----------+
|  James|     M|   100|      Male|
|Michael|     M|   400|      Male|
| Robert|  null|   500|      null|
|  Maria|     F|   800|         F|
|    Jen|      |  null|          |
+-------+------+------+----------+



In [29]:
# Column.isNotNull()
# True if the current expression is NOT null.
df.select(col("salary").isNotNull()).show()

+--------------------+
|(salary IS NOT NULL)|
+--------------------+
|                true|
|                true|
|                true|
|                true|
|               false|
+--------------------+



In [30]:
# Column.isNull()
# True if the current expression is null.
df.select(col("salary").isNull()).show()

+----------------+
|(salary IS NULL)|
+----------------+
|           false|
|           false|
|           false|
|           false|
|            true|
+----------------+



In [32]:
# Column.between(lowerBound, upperBound)
df.select(col("salary").between(200,700)).show()

+-------------------------------------+
|((salary >= 200) AND (salary <= 700))|
+-------------------------------------+
|                                false|
|                                false|
|                                false|
+-------------------------------------+



In [33]:
# Column.cast(dataType)
# Convert the column into type dataType

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

simpleData = [("James",34,"2006-01-01","true","M",3000.60),
    ("Michael",33,"1980-01-10","true","F",3300.80),
    ("Robert",37,"06-01-1992","false","M",5000.50)
  ]

columns = ["firstname","age","jobStartDate","isGraduated","gender","salary"]
df = spark.createDataFrame(data = simpleData, schema = columns)

# Convert String to Integer Type
df.withColumn("age", df.age.cast(IntegerType()))
df.withColumn("age", df.age.cast('int'))
df.withColumn("age", df.age.cast('integer'))

# Using select
df.select(col("age").cast('int').alias("age"))

#Using selectExpr()
df.selectExpr("cast(age as int) age")

DataFrame[age: int]

# Dates and Time Stuff

In [34]:
df.select(F.current_date().alias("current_date")).show()

+------------+
|current_date|
+------------+
|  2021-10-12|
|  2021-10-12|
|  2021-10-12|
+------------+



# Else
- F.udf()


In [None]:
# To Do

-

- F.month()
- F.to_date()

- .repartition()
- .distinct()

- .like()
- .orderBy()
- .over()
- alerts.join()

- utils.union_hubs()
- workflow=workflow.select()
- Window.partitionBy()

-df.printSchema()

In [None]:
######################################################################
from pyspark.sql.functions import array_distinct
B = ["Bobi", "Blerina", "Misina", "Tyler"]
A = ["Bobi", "Blerina", "Misina", "Leo"]

def doubles(B,A):
    unique = []
    for i in A:
        unique.append(i)
    for i in B:
        unique.append(i)
    return unique

dub = doubles(B, A)
#array_distinct(dub)

In [None]:
def rename_column(df, prefix):
    '''
    This is the function we use at CS to rename columns
    '''
    for name in df.columns:
        if prefix not in name:
            df_new = df.withColumnRenamed(name, prefix + "__" + name)
            df = df_new
        else:
            raise ValueError('The chosen prefix already appers in the dataframe.')
    return df

titanic_new = rename_column(titanic, 'OK')
titanic_new.show(truncate=False)

In [None]:
def bobis_function():
    for i in range(1):
        print("Bujaa")
        
bobis_function()