In [13]:
!apt-get update # Update apt-get repository.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
!pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.


0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done


In [14]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [15]:
!ls

sample_data  spark-3.1.1-bin-hadoop3.2	spark-3.1.1-bin-hadoop3.2.tgz


In [16]:
# Initialize findspark
import findspark
findspark.init()

In [17]:
# Create a PySpark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

In [70]:
from pyspark.sql.functions import col, lit, array_contains

from pyspark.sql.types import StructField, StructType, StringType, IntegerType, ArrayType

In [18]:
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns =  ["firstanme", "middlename", "lastename", "dob", "gender", "salary"]

df = spark.createDataFrame(data = data, schema=columns)

In [19]:
df.show()

+---------+----------+---------+----------+------+------+
|firstanme|middlename|lastename|       dob|gender|salary|
+---------+----------+---------+----------+------+------+
|    James|          |    Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|         |2000-05-19|     M|  4000|
|   Robert|          | Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|    Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|    Brown|1980-02-17|     F|    -1|
+---------+----------+---------+----------+------+------+



In [29]:
df.printSchema()

root
 |-- firstanme: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastename: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



In [30]:
df = df.withColumn("salarys", col("salary").cast("Integer"))

In [31]:
df.printSchema()

root
 |-- firstanme: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastename: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- salarys: integer (nullable = true)



In [33]:
df.withColumn("salary", col("salary")* 100).show()

+---------+----------+---------+----------+------+------+-------+
|firstanme|middlename|lastename|       dob|gender|salary|salarys|
+---------+----------+---------+----------+------+------+-------+
|    James|          |    Smith|1991-04-01|     M|300000|   3000|
|  Michael|      Rose|         |2000-05-19|     M|400000|   4000|
|   Robert|          | Williams|1978-09-05|     M|400000|   4000|
|    Maria|      Anne|    Jones|1967-12-01|     F|400000|   4000|
|      Jen|      Mary|    Brown|1980-02-17|     F|  -100|     -1|
+---------+----------+---------+----------+------+------+-------+



In [35]:
df.withColumn("country", lit("usa")).show()

+---------+----------+---------+----------+------+------+-------+-------+
|firstanme|middlename|lastename|       dob|gender|salary|salarys|country|
+---------+----------+---------+----------+------+------+-------+-------+
|    James|          |    Smith|1991-04-01|     M|  3000|   3000|    usa|
|  Michael|      Rose|         |2000-05-19|     M|  4000|   4000|    usa|
|   Robert|          | Williams|1978-09-05|     M|  4000|   4000|    usa|
|    Maria|      Anne|    Jones|1967-12-01|     F|  4000|   4000|    usa|
|      Jen|      Mary|    Brown|1980-02-17|     F|    -1|     -1|    usa|
+---------+----------+---------+----------+------+------+-------+-------+



In [36]:
df.withColumn("newcolonne", lit("ivoirien"))\
  .withColumn("anothercolonne", lit("anothervalue"))\
  .show()

+---------+----------+---------+----------+------+------+-------+----------+--------------+
|firstanme|middlename|lastename|       dob|gender|salary|salarys|newcolonne|anothercolonne|
+---------+----------+---------+----------+------+------+-------+----------+--------------+
|    James|          |    Smith|1991-04-01|     M|  3000|   3000|  ivoirien|  anothervalue|
|  Michael|      Rose|         |2000-05-19|     M|  4000|   4000|  ivoirien|  anothervalue|
|   Robert|          | Williams|1978-09-05|     M|  4000|   4000|  ivoirien|  anothervalue|
|    Maria|      Anne|    Jones|1967-12-01|     F|  4000|   4000|  ivoirien|  anothervalue|
|      Jen|      Mary|    Brown|1980-02-17|     F|    -1|     -1|  ivoirien|  anothervalue|
+---------+----------+---------+----------+------+------+-------+----------+--------------+



In [39]:
df.withColumnRenamed("gender", "sex")\
.show(truncate = False)

+---------+----------+---------+----------+---+------+-------+
|firstanme|middlename|lastename|dob       |sex|salary|salarys|
+---------+----------+---------+----------+---+------+-------+
|James    |          |Smith    |1991-04-01|M  |3000  |3000   |
|Michael  |Rose      |         |2000-05-19|M  |4000  |4000   |
|Robert   |          |Williams |1978-09-05|M  |4000  |4000   |
|Maria    |Anne      |Jones    |1967-12-01|F  |4000  |4000   |
|Jen      |Mary      |Brown    |1980-02-17|F  |-1    |-1     |
+---------+----------+---------+----------+---+------+-------+



In [41]:
df.drop("salarys").show()

+---------+----------+---------+----------+------+------+
|firstanme|middlename|lastename|       dob|gender|salary|
+---------+----------+---------+----------+------+------+
|    James|          |    Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|         |2000-05-19|     M|  4000|
|   Robert|          | Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|    Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|    Brown|1980-02-17|     F|    -1|
+---------+----------+---------+----------+------+------+



In [43]:
data = [
    (("James","","Smith"),["Java","Scala","C++"],"OH","M"),
    (("Anna","Rose",""),["Spark","Java","C++"],"NY","F"),
    (("Julia","","Williams"),["CSharp","VB"],"OH","F"),
    (("Maria","Anne","Jones"),["CSharp","VB"],"NY","M"),
    (("Jen","Mary","Brown"),["CSharp","VB"],"NY","M"),
    (("Mike","Mary","Williams"),["Python","VB"],"OH","M")
 ]

schema = StructType([
     StructField('name', StructType([
        StructField('firstname', StringType(), True),
        StructField('middlename', StringType(), True),
         StructField('lastname', StringType(), True)
     ])),
     StructField('languages', ArrayType(StringType()), True),
     StructField('state', StringType(), True),
     StructField('gender', StringType(), True)
 ])

df = spark.createDataFrame(data = data, schema = schema)
df.printSchema()
df.show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)

+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |
|{Anna, Rose, }        |[Spark, Java, C++]|NY   |F     |
|{Julia, , Williams}   |[CSharp, VB]      |OH   |F     |
|{Maria, Anne, Jones}  |[CSharp, VB]      |NY   |M     |
|{Jen, Mary, Brown}    |[CSharp, VB]      |NY   |M     |
|{Mike, Mary, Williams}|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+



In [44]:
df.filter(df.state == "OH").show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    {James, , Smith}|[Java, Scala, C++]|   OH|     M|
| {Julia, , Williams}|      [CSharp, VB]|   OH|     F|
|{Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+



In [45]:
df.filter(df.state != "OH").show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|      {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
|{Maria, Anne, Jones}|      [CSharp, VB]|   NY|     M|
|  {Jen, Mary, Brown}|      [CSharp, VB]|   NY|     M|
+--------------------+------------------+-----+------+



In [47]:
df.filter(~(df.state == "OH"))\
.filter(df.gender != "F")\
.show()

+--------------------+------------+-----+------+
|                name|   languages|state|gender|
+--------------------+------------+-----+------+
|{Maria, Anne, Jones}|[CSharp, VB]|   NY|     M|
|  {Jen, Mary, Brown}|[CSharp, VB]|   NY|     M|
+--------------------+------------+-----+------+



In [51]:
df.filter((df.state == "OH") & (df.gender == "M")).show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    {James, , Smith}|[Java, Scala, C++]|   OH|     M|
|{Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+



In [52]:
element = ["OH", "CA", "DE"]

df.filter(df.state.isin(element)).show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    {James, , Smith}|[Java, Scala, C++]|   OH|     M|
| {Julia, , Williams}|      [CSharp, VB]|   OH|     F|
|{Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+



In [53]:
df.filter(~df.state.isin(element)).show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|      {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
|{Maria, Anne, Jones}|      [CSharp, VB]|   NY|     M|
|  {Jen, Mary, Brown}|      [CSharp, VB]|   NY|     M|
+--------------------+------------------+-----+------+



In [54]:
df.filter(df.state.isin(element) == False).show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|      {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
|{Maria, Anne, Jones}|      [CSharp, VB]|   NY|     M|
|  {Jen, Mary, Brown}|      [CSharp, VB]|   NY|     M|
+--------------------+------------------+-----+------+



In [55]:
df.filter(df.state.startswith('N')).show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|      {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
|{Maria, Anne, Jones}|      [CSharp, VB]|   NY|     M|
|  {Jen, Mary, Brown}|      [CSharp, VB]|   NY|     M|
+--------------------+------------------+-----+------+



In [56]:
df.printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)



In [60]:
# Prepare Data
data2 = [(2,"Michael Rose"),(3,"Robert Williams"),
     (4,"Rames Rose"),(5,"Rames rose")
  ]
df2 = spark.createDataFrame(data = data2, schema = ["id","name"])

In [61]:
df2.show()

+---+---------------+
| id|           name|
+---+---------------+
|  2|   Michael Rose|
|  3|Robert Williams|
|  4|     Rames Rose|
|  5|     Rames rose|
+---+---------------+



In [63]:
df2.filter(df2.name.like("%ll%")).show()

+---+---------------+
| id|           name|
+---+---------------+
|  3|Robert Williams|
+---+---------------+



In [68]:
df2.filter(df2.name.rlike("(?i)^*rose$")).show()

+---+------------+
| id|        name|
+---+------------+
|  2|Michael Rose|
|  4|  Rames Rose|
|  5|  Rames rose|
+---+------------+



In [69]:
df.show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    {James, , Smith}|[Java, Scala, C++]|   OH|     M|
|      {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
| {Julia, , Williams}|      [CSharp, VB]|   OH|     F|
|{Maria, Anne, Jones}|      [CSharp, VB]|   NY|     M|
|  {Jen, Mary, Brown}|      [CSharp, VB]|   NY|     M|
|{Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+



In [71]:
df.filter(array_contains(df.languages, "VB")).show()

+--------------------+------------+-----+------+
|                name|   languages|state|gender|
+--------------------+------------+-----+------+
| {Julia, , Williams}|[CSharp, VB]|   OH|     F|
|{Maria, Anne, Jones}|[CSharp, VB]|   NY|     M|
|  {Jen, Mary, Brown}|[CSharp, VB]|   NY|     M|
|{Mike, Mary, Will...|[Python, VB]|   OH|     M|
+--------------------+------------+-----+------+



In [72]:
df.withColumn("n_state", col("languages")[0]).show()

+--------------------+------------------+-----+------+-------+
|                name|         languages|state|gender|n_state|
+--------------------+------------------+-----+------+-------+
|    {James, , Smith}|[Java, Scala, C++]|   OH|     M|   Java|
|      {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|  Spark|
| {Julia, , Williams}|      [CSharp, VB]|   OH|     F| CSharp|
|{Maria, Anne, Jones}|      [CSharp, VB]|   NY|     M| CSharp|
|  {Jen, Mary, Brown}|      [CSharp, VB]|   NY|     M| CSharp|
|{Mike, Mary, Will...|      [Python, VB]|   OH|     M| Python|
+--------------------+------------------+-----+------+-------+



In [73]:
data = [("James", "Sales", 3000), \
    ("Michael", "Sales", 4600), \
    ("Robert", "Sales", 4100), \
    ("Maria", "Finance", 3000), \
    ("James", "Sales", 3000), \
    ("Scott", "Finance", 3300), \
    ("Jen", "Finance", 3900), \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000), \
    ("Saif", "Sales", 4100) \
  ]

# Create DataFrame
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [74]:
df2 = df.dropDuplicates()

In [76]:
print(str(df2.count()))

9


In [77]:
print(str(df.count()))

10


In [78]:
simpleData = [("James","Sales","NY",90000,34,10000), \
    ("Michael","Sales","NY",86000,56,20000), \
    ("Robert","Sales","CA",81000,30,23000), \
    ("Maria","Finance","CA",90000,24,23000), \
    ("Raman","Finance","CA",99000,40,24000), \
    ("Scott","Finance","NY",83000,36,19000), \
    ("Jen","Finance","NY",79000,53,15000), \
    ("Jeff","Marketing","CA",80000,25,18000), \
    ("Kumar","Marketing","NY",91000,50,21000) \
  ]
columns= ["employee_name","department","state","salary","age","bonus"]
# Create SparkSession

df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+



In [79]:
from pyspark.sql.functions import asc, desc

In [88]:
df.sort(col("salary"), col("age")).show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        James|     Sales|   NY| 90000| 34|10000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|        Raman|   Finance|   CA| 99000| 40|24000|
+-------------+----------+-----+------+---+-----+



In [95]:
df.sort(df.age, df.salary, ascending=  [True, True]).show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        Maria|   Finance|   CA| 90000| 24|23000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        James|     Sales|   NY| 90000| 34|10000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|      Michael|     Sales|   NY| 86000| 56|20000|
+-------------+----------+-----+------+---+-----+



In [97]:
df.orderBy("department").show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        Maria|   Finance|   CA| 90000| 24|23000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|        James|     Sales|   NY| 90000| 34|10000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|      Michael|     Sales|   NY| 86000| 56|20000|
+-------------+----------+-----+------+---+-----+



In [98]:
df.sort(df.age.asc(), df.salary.asc()).show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        Maria|   Finance|   CA| 90000| 24|23000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        James|     Sales|   NY| 90000| 34|10000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|      Michael|     Sales|   NY| 86000| 56|20000|
+-------------+----------+-----+------+---+-----+



In [103]:
df.sort(col('salary').desc()).sort(col('age').asc()).show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        Maria|   Finance|   CA| 90000| 24|23000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        James|     Sales|   NY| 90000| 34|10000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|      Michael|     Sales|   NY| 86000| 56|20000|
+-------------+----------+-----+------+---+-----+



In [104]:
simpleData = [("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]

schema = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema = schema)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+



In [105]:
df.groupBy("department").sum("salary").show()

+----------+-----------+
|department|sum(salary)|
+----------+-----------+
|     Sales|     257000|
|   Finance|     351000|
| Marketing|     171000|
+----------+-----------+



In [107]:
df.groupBy("department").count().show()

+----------+-----+
|department|count|
+----------+-----+
|     Sales|    3|
|   Finance|    4|
| Marketing|    2|
+----------+-----+



In [109]:
df.groupBy('department', "state").sum("salary", "bonus").withColumnRenamed("sum(salary)", "sum salaire").show()

+----------+-----+-----------+----------+
|department|state|sum salaire|sum(bonus)|
+----------+-----+-----------+----------+
|   Finance|   NY|     162000|     34000|
| Marketing|   NY|      91000|     21000|
|     Sales|   CA|      81000|     23000|
| Marketing|   CA|      80000|     18000|
|   Finance|   CA|     189000|     47000|
|     Sales|   NY|     176000|     30000|
+----------+-----+-----------+----------+



In [111]:
from pyspark.sql.functions import sum, avg, max

In [115]:
df.groupBy("department").agg(sum("salary").alias("sum_salary"),\
                              avg("salary").alias("avg_salary")).show()

+----------+----------+-----------------+
|department|sum_salary|       avg_salary|
+----------+----------+-----------------+
|     Sales|    257000|85666.66666666667|
|   Finance|    351000|          87750.0|
| Marketing|    171000|          85500.0|
+----------+----------+-----------------+



In [116]:

emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)

dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- superior_emp_id: long (nullable = true)
 |-- year_joined: string (nullable = true)
 |-- emp_dept_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+-----

In [117]:
empDF.show(truncate=False)

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+



In [118]:
deptDF.show(truncate=False)

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+



In [120]:
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, "full").show()

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|     6|   Brown|              2|       2010|         50|      |    -1|     null|   null|
|     1|   Smith|             -1|       2018|         10|     M|  3000|  Finance|     10|
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|     10|
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|     10|
|  null|    null|           null|       null|       null|  null|  null|    Sales|     30|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|     20|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|     40|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [122]:
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")

In [123]:
spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [124]:
# Create DataFrame df1 with columns name, and id
data = [("James",34), ("Michael",56), \
        ("Robert",30), ("Maria",24) ]

df1 = spark.createDataFrame(data = data, schema=["name","id"])
df1.printSchema()

# Create DataFrame df2 with columns name and id
data2=[(34,"James"),(45,"Maria"), \
       (45,"Jen"),(34,"Jeff")]

df2 = spark.createDataFrame(data = data2, schema = ["id","name"])
df2.printSchema()

root
 |-- name: string (nullable = true)
 |-- id: long (nullable = true)

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



In [125]:
df1.show()

+-------+---+
|   name| id|
+-------+---+
|  James| 34|
|Michael| 56|
| Robert| 30|
|  Maria| 24|
+-------+---+



In [126]:
df2.show()

+---+-----+
| id| name|
+---+-----+
| 34|James|
| 45|Maria|
| 45|  Jen|
| 34| Jeff|
+---+-----+



In [127]:
df3 = df1.unionByName(df2)
df3.show()

+-------+---+
|   name| id|
+-------+---+
|  James| 34|
|Michael| 56|
| Robert| 30|
|  Maria| 24|
|  James| 34|
|  Maria| 45|
|    Jen| 45|
|   Jeff| 34|
+-------+---+



In [128]:
df1 = spark.createDataFrame([[5, 2, 6]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[6, 7, 3]], ["col1", "col2", "col3"])

In [129]:
df1.show(), df2.show()

+----+----+----+
|col0|col1|col2|
+----+----+----+
|   5|   2|   6|
+----+----+----+

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   6|   7|   3|
+----+----+----+



(None, None)

In [130]:
df3 = df1.unionByName(df2, allowMissingColumns=True,)
df3.show()

+----+----+----+----+
|col0|col1|col2|col3|
+----+----+----+----+
|   5|   2|   6|null|
|null|   6|   7|   3|
+----+----+----+----+



In [131]:
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+



In [147]:
def convertCase(str):
  resStr = ""
  arr = str.split(" ")
  for x in arr:
    resStr = resStr + x[0:1].upper() + x[1:len(x)] + " "
  resStr = resStr[0:len(resStr)-1]
  return resStr

In [148]:
string = "venir me voir"

In [149]:
convertCase(string)

'Venir Me Voir'

In [153]:
convertCase

In [150]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

In [152]:
convertUDF = udf(lambda z: convertCase(z), StringType())

In [154]:
convertUDF

<function __main__.<lambda>(z)>

In [158]:
df.select(col("Seqno"), convertUDF(col("Name")).alias("Name")).show()

+-----+------------+
|Seqno|        Name|
+-----+------------+
|    1|  John Jones|
|    2|Tracey Smith|
|    3| Amy Sanders|
+-----+------------+



In [159]:
def upperCase(str):
  return str.upper()

In [165]:
upperCaseUDF = udf(lambda x: upperCase(x))

In [166]:
df.withColumn("Cureated Name", upperCaseUDF(col("Name"))).show()

+-----+------------+-------------+
|Seqno|        Name|Cureated Name|
+-----+------------+-------------+
|    1|  john jones|   JOHN JONES|
|    2|tracey smith| TRACEY SMITH|
|    3| amy sanders|  AMY SANDERS|
+-----+------------+-------------+



In [163]:
df.show(truncate=False)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+



In [167]:
# Prepare Data
simpleData = (("Java",4000,5), \
    ("Python", 4600,10),  \
    ("Scala", 4100,15),   \
    ("Scala", 4500,15),   \
    ("PHP", 3000,20),  \
  )
columns= ["CourseName", "fee", "discount"]

# Create DataFrame
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- CourseName: string (nullable = true)
 |-- fee: long (nullable = true)
 |-- discount: long (nullable = true)

+----------+----+--------+
|CourseName|fee |discount|
+----------+----+--------+
|Java      |4000|5       |
|Python    |4600|10      |
|Scala     |4100|15      |
|Scala     |4500|15      |
|PHP       |3000|20      |
+----------+----+--------+



In [184]:
from pyspark.sql.functions import upper

def to_upper_str_columns(df):
    return df.withColumn("CourseName",upper(df.CourseName))

In [186]:
df.transform(to_upper_str_columns).show()

+----------+----+--------+
|CourseName| fee|discount|
+----------+----+--------+
|      JAVA|4000|       5|
|    PYTHON|4600|      10|
|     SCALA|4100|      15|
|     SCALA|4500|      15|
|       PHP|3000|      20|
+----------+----+--------+



In [187]:
def select_columns(df):
    return df.select("CourseName","discounted_fee")

In [None]:
import pyspark.pandas as ps

In [195]:
import numpy as np

In [196]:
technologies = ({
    'Fee' :[20000,25000,30000,22000,np.NaN],
    'Discount':[1000,2500,1500,1200,3000]
               })


In [197]:
technologies

{'Fee': [20000, 25000, 30000, 22000, nan],
 'Discount': [1000, 2500, 1500, 1200, 3000]}

In [198]:
df = ps.DataFrame(technologies)

AttributeError: module 'pyspark.sql.pandas' has no attribute 'DataFrame'

In [200]:
data = ["Project","Gutenberg’s","Alice’s","Adventures",
"in","Wonderland","Project","Gutenberg’s","Adventures",
"in","Wonderland","Project","Gutenberg’s"]

rdd=spark.sparkContext.parallelize(data)

In [203]:
rdd.collect()

['Project',
 'Gutenberg’s',
 'Alice’s',
 'Adventures',
 'in',
 'Wonderland',
 'Project',
 'Gutenberg’s',
 'Adventures',
 'in',
 'Wonderland',
 'Project',
 'Gutenberg’s']

In [204]:
rdd2 = rdd.map(lambda x: (x, 1))

In [206]:
for element in rdd2.collect():
  print(element)

('Project', 1)
('Gutenberg’s', 1)
('Alice’s', 1)
('Adventures', 1)
('in', 1)
('Wonderland', 1)
('Project', 1)
('Gutenberg’s', 1)
('Adventures', 1)
('in', 1)
('Wonderland', 1)
('Project', 1)
('Gutenberg’s', 1)


In [207]:
# Prepare Data
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

# Create DataFrame
df = spark.createDataFrame(data=data,schema=columns)
df.show()

+-----+------------+
|Seqno|        Name|
+-----+------------+
|    1|  john jones|
|    2|tracey smith|
|    3| amy sanders|
+-----+------------+



In [211]:
def f(df):
  print(df.Seqno)


In [214]:
df.foreach(f)

In [215]:
accum=spark.sparkContext.accumulator(0)
rdd=spark.sparkContext.parallelize([1,2,3,4,5])

In [216]:
accum

Accumulator<id=0, value=0>

In [217]:
rdd.foreach(lambda x: accum.add(x))

In [218]:
print(accum.value)

15


In [219]:
df = spark.range(100)
df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+
only showing top 20 rows



In [220]:
df2 = df.select((df.id % 3).alias("key"))

In [221]:
df2.show()

+---+
|key|
+---+
|  0|
|  1|
|  2|
|  0|
|  1|
|  2|
|  0|
|  1|
|  2|
|  0|
|  1|
|  2|
|  0|
|  1|
|  2|
|  0|
|  1|
|  2|
|  0|
|  1|
+---+
only showing top 20 rows



In [229]:
print(len(rdd.takeSample(False,10,0)))

5


In [258]:
path = "https://raw.githubusercontent.com/spark-examples/spark-scala-examples/master/src/main/resources/small_zipcode.csv"

In [259]:
from pyspark import SparkFiles

In [260]:
spark.sparkContext.addFile(path)

In [261]:
spark

In [262]:
df = spark.read.csv(SparkFiles.get("small_zipcode.csv"), header = True)

In [242]:
df.show()

+---+-------+--------+-------------------+-----+----------+
| id|zipcode|    type|               city|state|population|
+---+-------+--------+-------------------+-----+----------+
|  1|    704|STANDARD|               null|   PR|     30100|
|  2|    704|    null|PASEO COSTA DEL SUR|   PR|      null|
|  3|    709|    null|       BDA SAN LUIS|   PR|      3700|
|  4|  76166|  UNIQUE|  CINGULAR WIRELESS|   TX|     84000|
|  5|  76177|STANDARD|               null|   TX|      null|
+---+-------+--------+-------------------+-----+----------+



In [248]:
df.na.fill("0", subset=["population"]).show()

+---+-------+--------+-------------------+-----+----------+
| id|zipcode|    type|               city|state|population|
+---+-------+--------+-------------------+-----+----------+
|  1|    704|STANDARD|               null|   PR|     30100|
|  2|    704|    null|PASEO COSTA DEL SUR|   PR|         0|
|  3|    709|    null|       BDA SAN LUIS|   PR|      3700|
|  4|  76166|  UNIQUE|  CINGULAR WIRELESS|   TX|     84000|
|  5|  76177|STANDARD|               null|   TX|         0|
+---+-------+--------+-------------------+-----+----------+



In [251]:
df.na.fill("").show()

+---+-------+--------+-------------------+-----+----------+
| id|zipcode|    type|               city|state|population|
+---+-------+--------+-------------------+-----+----------+
|  1|    704|STANDARD|                   |   PR|     30100|
|  2|    704|        |PASEO COSTA DEL SUR|   PR|          |
|  3|    709|        |       BDA SAN LUIS|   PR|      3700|
|  4|  76166|  UNIQUE|  CINGULAR WIRELESS|   TX|     84000|
|  5|  76177|STANDARD|                   |   TX|          |
+---+-------+--------+-------------------+-----+----------+



In [252]:
df.na.fill("unknow", ["city"])\
.na.fill("", ["type"]).show()

+---+-------+--------+-------------------+-----+----------+
| id|zipcode|    type|               city|state|population|
+---+-------+--------+-------------------+-----+----------+
|  1|    704|STANDARD|             unknow|   PR|     30100|
|  2|    704|        |PASEO COSTA DEL SUR|   PR|      null|
|  3|    709|        |       BDA SAN LUIS|   PR|      3700|
|  4|  76166|  UNIQUE|  CINGULAR WIRELESS|   TX|     84000|
|  5|  76177|STANDARD|             unknow|   TX|      null|
+---+-------+--------+-------------------+-----+----------+



In [253]:
data = [("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"), \
      ("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"), \
      ("Carrots",1200,"China"),("Beans",1500,"China"),("Orange",4000,"China"), \
      ("Banana",2000,"Canada"),("Carrots",2000,"Canada"),("Beans",2000,"Mexico")]

columns= ["Product","Amount","Country"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- Product: string (nullable = true)
 |-- Amount: long (nullable = true)
 |-- Country: string (nullable = true)

+-------+------+-------+
|Product|Amount|Country|
+-------+------+-------+
|Banana |1000  |USA    |
|Carrots|1500  |USA    |
|Beans  |1600  |USA    |
|Orange |2000  |USA    |
|Orange |2000  |USA    |
|Banana |400   |China  |
|Carrots|1200  |China  |
|Beans  |1500  |China  |
|Orange |4000  |China  |
|Banana |2000  |Canada |
|Carrots|2000  |Canada |
|Beans  |2000  |Mexico |
+-------+------+-------+



In [257]:
df.groupBy("Product").pivot("Country").sum("Amount").show()

+-------+------+-----+------+----+
|Product|Canada|China|Mexico| USA|
+-------+------+-----+------+----+
| Orange|  null| 4000|  null|4000|
|  Beans|  null| 1500|  2000|1600|
| Banana|  2000|  400|  null|1000|
|Carrots|  2000| 1200|  null|1500|
+-------+------+-----+------+----+



In [263]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- type: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- population: string (nullable = true)



In [264]:
df.write.option("header", True).partitionBy("state").mode("overwrite").csv("zipcode")

In [266]:
ls -lrt zipcode/state=PR

total 4
-rw-r--r-- 1 root root 115 Feb 22 13:48 part-00000-ac0ba6c8-3540-4574-ba1d-2912e5a15bcb.c000.csv


In [269]:
from pyspark.sql.types import StructField, StructType, StringType, MapType
schema = StructType([
    StructField('name', StringType(), True),
    StructField('properties', MapType(StringType(),StringType()),True)
])

dataDictionary = [
        ('James',{'hair':'black','eye':'brown'}),
        ('Michael',{'hair':'brown','eye':None}),
        ('Robert',{'hair':'red','eye':'black'}),
        ('Washington',{'hair':'grey','eye':'grey'}),
        ('Jefferson',{'hair':'brown','eye':''})
        ]

df = spark.createDataFrame(data=dataDictionary, schema = schema)
df.printSchema()


root
 |-- name: string (nullable = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



In [271]:
df.show(truncate=False)

+----------+-----------------------------+
|name      |properties                   |
+----------+-----------------------------+
|James     |{eye -> brown, hair -> black}|
|Michael   |{eye -> null, hair -> brown} |
|Robert    |{eye -> black, hair -> red}  |
|Washington|{eye -> grey, hair -> grey}  |
|Jefferson |{eye -> , hair -> brown}     |
+----------+-----------------------------+



In [273]:
df.rdd.map(lambda x: (x.name, x.properties['hair'], x.properties['eye'])).toDF(["name", "hair", "eye"]).show()

+----------+-----+-----+
|      name| hair|  eye|
+----------+-----+-----+
|     James|black|brown|
|   Michael|brown| null|
|    Robert|  red|black|
|Washington| grey| grey|
| Jefferson|brown|     |
+----------+-----+-----+



In [276]:
df.withColumn("hair", df.properties.getItem('hair')).show()

+----------+--------------------+-----+
|      name|          properties| hair|
+----------+--------------------+-----+
|     James|{eye -> brown, ha...|black|
|   Michael|{eye -> null, hai...|brown|
|    Robert|{eye -> black, ha...|  red|
|Washington|{eye -> grey, hai...| grey|
| Jefferson|{eye -> , hair ->...|brown|
+----------+--------------------+-----+



In [281]:
from pyspark.sql.functions import explode, map_keys, map_values

In [278]:
df.select(df.name, explode(df.properties)).show()

+----------+----+-----+
|      name| key|value|
+----------+----+-----+
|     James| eye|brown|
|     James|hair|black|
|   Michael| eye| null|
|   Michael|hair|brown|
|    Robert| eye|black|
|    Robert|hair|  red|
|Washington| eye| grey|
|Washington|hair| grey|
| Jefferson| eye|     |
| Jefferson|hair|brown|
+----------+----+-----+



In [280]:
df.select(df.name, map_keys(df.properties)).show()

+----------+--------------------+
|      name|map_keys(properties)|
+----------+--------------------+
|     James|         [eye, hair]|
|   Michael|         [eye, hair]|
|    Robert|         [eye, hair]|
|Washington|         [eye, hair]|
| Jefferson|         [eye, hair]|
+----------+--------------------+



In [282]:
data=[["1","2020-02-01"],["2","2019-03-01"],["3","2021-03-01"]]
df=spark.createDataFrame(data,["id","input"])
df.show()

+---+----------+
| id|     input|
+---+----------+
|  1|2020-02-01|
|  2|2019-03-01|
|  3|2021-03-01|
+---+----------+



In [283]:
from pyspark.sql.functions import *

In [284]:
df.select(current_date().alias("currente_date")).show()

+-------------+
|currente_date|
+-------------+
|   2024-02-22|
|   2024-02-22|
|   2024-02-22|
+-------------+



In [286]:
df.select(col("input"),
          date_format(col("input"), "MM-dd-yyyy").alias("date_format")).show()

+----------+-----------+
|     input|date_format|
+----------+-----------+
|2020-02-01| 02-01-2020|
|2019-03-01| 03-01-2019|
|2021-03-01| 03-01-2021|
+----------+-----------+



In [287]:
df.select(col("input"),
          datediff(current_date(), col('input')).alias("datediff")).show()

+----------+--------+
|     input|datediff|
+----------+--------+
|2020-02-01|    1482|
|2019-03-01|    1819|
|2021-03-01|    1088|
+----------+--------+



In [288]:
df.select(col('input'), months_between(current_date(), col("input")).alias("months_between")).show()

+----------+--------------+
|     input|months_between|
+----------+--------------+
|2020-02-01|   48.67741935|
|2019-03-01|   59.67741935|
|2021-03-01|   35.67741935|
+----------+--------------+



In [292]:
df.select(col("input"),
          trunc(col("input"), "Month").alias("Month_trunc")).show()

+----------+-----------+
|     input|Month_trunc|
+----------+-----------+
|2020-02-01| 2020-02-01|
|2019-03-01| 2019-03-01|
|2021-03-01| 2021-03-01|
+----------+-----------+



In [293]:
df.select(col("input"),
     year(col("input")).alias("year")).show()

+----------+----+
|     input|year|
+----------+----+
|2020-02-01|2020|
|2019-03-01|2019|
|2021-03-01|2021|
+----------+----+



In [294]:
df.select(col("input"),
     dayofweek(col("input")).alias("dayofweek")).show()

+----------+---------+
|     input|dayofweek|
+----------+---------+
|2020-02-01|        7|
|2019-03-01|        6|
|2021-03-01|        2|
+----------+---------+



In [295]:
jsonString = """{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}"""

df = spark.createDataFrame([(1, jsonString)], ["id","value"])

df.show(truncate=False)

+---+--------------------------------------------------------------------------+
|id |value                                                                     |
+---+--------------------------------------------------------------------------+
|1  |{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}|
+---+--------------------------------------------------------------------------+



In [296]:
from pyspark.sql.types import MapType, StringType


In [297]:
df2 = df.withColumn("value", from_json(df.value, MapType(StringType(), StringType())))


In [299]:
df2.show(truncate = False)

+---+---------------------------------------------------------------------------+
|id |value                                                                      |
+---+---------------------------------------------------------------------------+
|1  |{Zipcode -> 704, ZipCodeType -> STANDARD, City -> PARC PARQUE, State -> PR}|
+---+---------------------------------------------------------------------------+



In [300]:
from pyspark.sql.functions import to_json, col

In [302]:
df2.withColumn("value", to_json(col("value"))).show(truncate = False)

+---+----------------------------------------------------------------------------+
|id |value                                                                       |
+---+----------------------------------------------------------------------------+
|1  |{"Zipcode":"704","ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}|
+---+----------------------------------------------------------------------------+



Exercie pyspark

In [303]:
spark

In [315]:
!tar -xvf "worldcitiespop.txt.gz" -C 'content/celle'

tar: This does not look like a tar archive
tar: Skipping to next header
tar: Exiting with failure status due to previous errors


In [304]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

In [330]:
sc = SparkContext.getOrCreate()

In [338]:
rddStr = sc.textFile("worldcitiespop.txt", use_unicode="utf-8")

In [328]:
from sh import gunzip

In [329]:
gunzip('worldcitiespop.txt.gz', )

''

In [339]:
rddStr.take(5)

['Country,City,AccentCity,Region,Population,Latitude,Longitude',
 'ad,aixas,Aix�s,06,,42.4833333,1.4666667',
 'ad,aixirivali,Aixirivali,06,,42.4666667,1.5',
 'ad,aixirivall,Aixirivall,06,,42.4666667,1.5',
 'ad,aixirvall,Aixirvall,06,,42.4666667,1.5']

In [357]:
df = spark.read.options(header='True', inferSchema='True', delimiter=',').text("worldcitiespop.txt")

In [367]:
df = spark.read.option("header", "True") \
    .option("delimiter", ",") \
    .option("inferSchema", "True") \
    .option("encoding", "utf-8")\
    .csv("worldcitiespop.txt")

df.show(2, truncate=False)

+-------+----------+----------+------+----------+----------+---------+
|Country|City      |AccentCity|Region|Population|Latitude  |Longitude|
+-------+----------+----------+------+----------+----------+---------+
|ad     |aixas     |Aix�s     |06    |null      |42.4833333|1.4666667|
|ad     |aixirivali|Aixirivali|06    |null      |42.4666667|1.5      |
+-------+----------+----------+------+----------+----------+---------+
only showing top 2 rows



In [368]:
df.printSchema()

root
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- AccentCity: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Population: integer (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)



In [1]:
spark

NameError: name 'spark' is not defined