Prac 3a

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, ArrayType, StructType, StructField
spark = SparkSession.builder.appName('Examples').getOrCreate()

In [15]:
arraycol = ArrayType(StringType(), False)

In [16]:
data = [
    ("James Smith",["Java","Scala","C++"],["Spark","Java"],"OH","CA"),
    ("Michael Rose",["Spark","Java","C++"],["Spark","Java"],"NY","NJ"),
    ('Robert williams', ['CSharp', 'VB'], ['Spark', 'Python'], 'UT', 'NV')
]

In [17]:
schema = StructType([StructField('name', StringType(), True),
                     StructField('languagesAtSchool', arraycol, True),
                     StructField('languagesAtWork', arraycol, True),
                     StructField('currentState', StringType(), True),
                     StructField('previousState', StringType(), True)
                     ])

In [18]:
df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()
df.show()

root
 |-- name: string (nullable = true)
 |-- languagesAtSchool: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- languagesAtWork: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- currentState: string (nullable = true)
 |-- previousState: string (nullable = true)

+---------------+------------------+---------------+------------+-------------+
|           name| languagesAtSchool|languagesAtWork|currentState|previousState|
+---------------+------------------+---------------+------------+-------------+
|    James Smith|[Java, Scala, C++]|  [Spark, Java]|          OH|           CA|
|   Michael Rose|[Spark, Java, C++]|  [Spark, Java]|          NY|           NJ|
|Robert williams|      [CSharp, VB]|[Spark, Python]|          UT|           NV|
+---------------+------------------+---------------+------------+-------------+



In [19]:
from pyspark.sql.functions import explode
df.select(df.name, explode(df.languagesAtSchool)).show()

+---------------+------+
|           name|   col|
+---------------+------+
|    James Smith|  Java|
|    James Smith| Scala|
|    James Smith|   C++|
|   Michael Rose| Spark|
|   Michael Rose|  Java|
|   Michael Rose|   C++|
|Robert williams|CSharp|
|Robert williams|    VB|
+---------------+------+



In [20]:
from pyspark.sql.functions import split
df.select(split(df.name, " ").alias('nameAsArray')).show()

+------------------+
|       nameAsArray|
+------------------+
|    [James, Smith]|
|   [Michael, Rose]|
|[Robert, williams]|
+------------------+



In [21]:
from pyspark.sql.functions import array
df.select(df.name, array(df.currentState, df.previousState).alias("States")).show()

+---------------+--------+
|           name|  States|
+---------------+--------+
|    James Smith|[OH, CA]|
|   Michael Rose|[NY, NJ]|
|Robert williams|[UT, NV]|
+---------------+--------+



In [22]:
from pyspark.sql.functions import array_contains
df.select(df.name, array_contains(df.languagesAtSchool, "Java").alias('array_contains')).show()

+---------------+--------------+
|           name|array_contains|
+---------------+--------------+
|    James Smith|          true|
|   Michael Rose|          true|
|Robert williams|         false|
+---------------+--------------+



# Prac 3B

In [23]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Exam').getOrCreate()

In [24]:
columns = ["name", "languagesAtschool", "CurrentState"]
data = [("James smith", ["Java", "Scala", "C++"], "CA"),
        ("Michaeal Rose", ["Spark", "Java", "C++"], "NJ"),
        ("Robert Williams", ["CSharp", "VB"], "NV")]

In [25]:
df = spark.createDataFrame(data=data, schema=columns)
df.printSchema()
df.show(truncate=False)

root
 |-- name: string (nullable = true)
 |-- languagesAtschool: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- CurrentState: string (nullable = true)

+---------------+------------------+------------+
|name           |languagesAtschool |CurrentState|
+---------------+------------------+------------+
|James smith    |[Java, Scala, C++]|CA          |
|Michaeal Rose  |[Spark, Java, C++]|NJ          |
|Robert Williams|[CSharp, VB]      |NV          |
+---------------+------------------+------------+



In [26]:
from pyspark.sql.functions import col, concat_ws
df2 = df.withColumn("languagesAtschool", concat_ws(",", col("languagesAtSchool")))
df2.printSchema()
df2.show(truncate=False)

root
 |-- name: string (nullable = true)
 |-- languagesAtschool: string (nullable = false)
 |-- CurrentState: string (nullable = true)

+---------------+-----------------+------------+
|name           |languagesAtschool|CurrentState|
+---------------+-----------------+------------+
|James smith    |Java,Scala,C++   |CA          |
|Michaeal Rose  |Spark,Java,C++   |NJ          |
|Robert Williams|CSharp,VB        |NV          |
+---------------+-----------------+------------+



In [27]:
df.createOrReplaceTempView("ARRAY_STRING")
spark.sql("select name,concat_ws(',',languagesAtSchool) as languagesAtSchool,currentstate from ARRAY_STRING").show(truncate=False)

+---------------+-----------------+------------+
|name           |languagesAtSchool|currentstate|
+---------------+-----------------+------------+
|James smith    |Java,Scala,C++   |CA          |
|Michaeal Rose  |Spark,Java,C++   |NJ          |
|Robert Williams|CSharp,VB        |NV          |
+---------------+-----------------+------------+



# Prac 3C

In [28]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Exam").getOrCreate()

In [29]:
data = ['Project',
        "Gutenberg's",
        "Alice's",
        "Adventures",
        "in",
        "Wonderland",
        "Project",
        "Gutenberg's",
        "Adventures",
        "in",
        "Wonderland",
        "Project",
        "Gutenberg's"]

In [30]:
rdd = spark.sparkContext.parallelize(data)

In [31]:
rdd2 = rdd.map(lambda x:(x,1))
for element in rdd2.collect():
  print(element)

('Project', 1)
("Gutenberg's", 1)
("Alice's", 1)
('Adventures', 1)
('in', 1)
('Wonderland', 1)
('Project', 1)
("Gutenberg's", 1)
('Adventures', 1)
('in', 1)
('Wonderland', 1)
('Project', 1)
("Gutenberg's", 1)


In [32]:
data = [('James', 'Smith', 'M', 30),
        ("Anna", "Rose", "F", 41),
        ("Robert", "Williams", "M", 62)]

In [33]:
columns = ["firstname", "lastname", "gender", "salary"]
df = spark.createDataFrame(data = data, schema = columns)
df.show()

+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
|    James|   Smith|     M|    30|
|     Anna|    Rose|     F|    41|
|   Robert|Williams|     M|    62|
+---------+--------+------+------+



In [34]:
rdd2 = df.rdd.map(lambda x:
                  (x[0]+","+x[1], x[2], x[3]*2))

df2 = rdd2.toDF(["name", "gender", "new_salary"])
df2.show()

+---------------+------+----------+
|           name|gender|new_salary|
+---------------+------+----------+
|    James,Smith|     M|        60|
|      Anna,Rose|     F|        82|
|Robert,Williams|     M|       124|
+---------------+------+----------+



In [35]:
# Referring column Names
rdd2 = df.rdd.map(lambda x:
                  (x['firstname'] + "," + x["lastname"], x["gender"], x["salary"]*2))

In [36]:
# Referring column Names
rdd2 = df.rdd.map(lambda x:
                  (x.firstname + "," + x.lastname, x.gender, x.salary*2))

In [37]:
def func1(x):
  firstName = x.firstname
  lastName = x.lastname
  name = firstName + "," + lastName
  gender = x.gender.lower()
  salary = x.salary*2
  return (name, gender, salary)

In [38]:
rdd2 = df.rdd.map(lambda x: func1(x))

# Prac D

In [39]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Exam").getOrCreate()

In [40]:
dataDictionary = [
    ('James', {'hair':'black', 'eye':'brown'}),
    ('Michael', {'hair':'brown', 'eye':None}),
    ('Robert', {'hair':'red', 'eye':'black'}),
    ('Washington', {'hair':'grey', 'eye':'grey'}),
    ('Jefferson', {'hair':'brown', 'eye':''})
]

In [41]:
df = spark.createDataFrame(data=dataDictionary, schema=['name', 'properties'])
df.printSchema()
df.show(truncate=False)

root
 |-- name: string (nullable = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+----------+-----------------------------+
|name      |properties                   |
+----------+-----------------------------+
|James     |{eye -> brown, hair -> black}|
|Michael   |{eye -> NULL, hair -> brown} |
|Robert    |{eye -> black, hair -> red}  |
|Washington|{eye -> grey, hair -> grey}  |
|Jefferson |{eye -> , hair -> brown}     |
+----------+-----------------------------+



In [42]:
df3 = df.rdd.map(lambda x: (x.name, x.properties['hair'], x.properties['eye'])).toDF(['name', 'hair', 'eye'])
df3.printSchema()
df3.show()

root
 |-- name: string (nullable = true)
 |-- hair: string (nullable = true)
 |-- eye: string (nullable = true)

+----------+-----+-----+
|      name| hair|  eye|
+----------+-----+-----+
|     James|black|brown|
|   Michael|brown| NULL|
|    Robert|  red|black|
|Washington| grey| grey|
| Jefferson|brown|     |
+----------+-----+-----+



In [43]:
df.withColumn("hair", df.properties.getItem("hair"))\
  .withColumn("eye", df.properties.getItem("eye"))\
  .drop("properties")\
  .show()

+----------+-----+-----+
|      name| hair|  eye|
+----------+-----+-----+
|     James|black|brown|
|   Michael|brown| NULL|
|    Robert|  red|black|
|Washington| grey| grey|
| Jefferson|brown|     |
+----------+-----+-----+



In [44]:
df.withColumn("hair", df.properties["hair"])\
  .withColumn("eye", df.properties["eye"])\
  .drop("properties")\
  .show()

+----------+-----+-----+
|      name| hair|  eye|
+----------+-----+-----+
|     James|black|brown|
|   Michael|brown| NULL|
|    Robert|  red|black|
|Washington| grey| grey|
| Jefferson|brown|     |
+----------+-----+-----+



In [45]:
# functions
from pyspark.sql.functions import explode, map_keys, col
keysDF = df.select(explode(map_keys(df.properties))).distinct()
keysList = keysDF.rdd.map(lambda x: x[0]).collect()
keyCols = list(map(lambda x: col("properties").getItem(x).alias(str(x)), keysList))

In [46]:
df.select(df.name, *keyCols).show()

+----------+-----+-----+
|      name|  eye| hair|
+----------+-----+-----+
|     James|brown|black|
|   Michael| NULL|brown|
|    Robert|black|  red|
|Washington| grey| grey|
| Jefferson|     |brown|
+----------+-----+-----+



# Prac E

In [47]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pyspark-by-example').getOrCreate()

In [48]:
arrayData = [
    ('James', ['Java', "Scala"], {'hair':'black', 'eye':'brows'}),
    ('Michael', ['Spark', 'Java', None], {'hair':'brown', 'eye':None}),
    ('Robert', ['CSharp', ""], {'hair':'red', 'eye':''}),
    ('Washington', None, None),
    ('Jefferson', ['1', '2'], {})
]

In [49]:
df = spark.createDataFrame(data=arrayData, schema=['name', 'knownLanguages', 'properties'])
df.show()

+----------+-------------------+--------------------+
|      name|     knownLanguages|          properties|
+----------+-------------------+--------------------+
|     James|      [Java, Scala]|{eye -> brows, ha...|
|   Michael|[Spark, Java, NULL]|{eye -> NULL, hai...|
|    Robert|         [CSharp, ]|{eye -> , hair ->...|
|Washington|               NULL|                NULL|
| Jefferson|             [1, 2]|                  {}|
+----------+-------------------+--------------------+



In [50]:
from pyspark.sql.functions import explode
df2 = df.select(df.name, explode(df.knownLanguages))
df2.printSchema()
df2.show()

root
 |-- name: string (nullable = true)
 |-- col: string (nullable = true)

+---------+------+
|     name|   col|
+---------+------+
|    James|  Java|
|    James| Scala|
|  Michael| Spark|
|  Michael|  Java|
|  Michael|  NULL|
|   Robert|CSharp|
|   Robert|      |
|Jefferson|     1|
|Jefferson|     2|
+---------+------+



In [51]:
from pyspark.sql.functions import explode
df3 = df.select(df.name, explode(df.properties))
df3.printSchema()
df3.show()

root
 |-- name: string (nullable = true)
 |-- key: string (nullable = false)
 |-- value: string (nullable = true)

+-------+----+-----+
|   name| key|value|
+-------+----+-----+
|  James| eye|brows|
|  James|hair|black|
|Michael| eye| NULL|
|Michael|hair|brown|
| Robert| eye|     |
| Robert|hair|  red|
+-------+----+-----+



In [52]:
from pyspark.sql.functions import explode_outer
"""with array"""
df.select(df.name, explode_outer(df.knownLanguages)).show()
"""with map"""
df.select(df.name, explode_outer(df.properties)).show()


+----------+------+
|      name|   col|
+----------+------+
|     James|  Java|
|     James| Scala|
|   Michael| Spark|
|   Michael|  Java|
|   Michael|  NULL|
|    Robert|CSharp|
|    Robert|      |
|Washington|  NULL|
| Jefferson|     1|
| Jefferson|     2|
+----------+------+

+----------+----+-----+
|      name| key|value|
+----------+----+-----+
|     James| eye|brows|
|     James|hair|black|
|   Michael| eye| NULL|
|   Michael|hair|brown|
|    Robert| eye|     |
|    Robert|hair|  red|
|Washington|NULL| NULL|
| Jefferson|NULL| NULL|
+----------+----+-----+



In [53]:
from pyspark.sql.functions import posexplode
"""with array"""
df.select(df.name, posexplode(df.knownLanguages)).show()
"""with map"""
df.select(df.name, posexplode(df.properties)).show()

+---------+---+------+
|     name|pos|   col|
+---------+---+------+
|    James|  0|  Java|
|    James|  1| Scala|
|  Michael|  0| Spark|
|  Michael|  1|  Java|
|  Michael|  2|  NULL|
|   Robert|  0|CSharp|
|   Robert|  1|      |
|Jefferson|  0|     1|
|Jefferson|  1|     2|
+---------+---+------+

+-------+---+----+-----+
|   name|pos| key|value|
+-------+---+----+-----+
|  James|  0| eye|brows|
|  James|  1|hair|black|
|Michael|  0| eye| NULL|
|Michael|  1|hair|brown|
| Robert|  0| eye|     |
| Robert|  1|hair|  red|
+-------+---+----+-----+



In [54]:
from pyspark.sql.functions import posexplode_outer
"""with array"""
df.select(df.name, posexplode_outer(df.knownLanguages)).show()

"""with map"""
df.select(df.name, posexplode_outer(df.properties)).show()


+----------+----+------+
|      name| pos|   col|
+----------+----+------+
|     James|   0|  Java|
|     James|   1| Scala|
|   Michael|   0| Spark|
|   Michael|   1|  Java|
|   Michael|   2|  NULL|
|    Robert|   0|CSharp|
|    Robert|   1|      |
|Washington|NULL|  NULL|
| Jefferson|   0|     1|
| Jefferson|   1|     2|
+----------+----+------+

+----------+----+----+-----+
|      name| pos| key|value|
+----------+----+----+-----+
|     James|   0| eye|brows|
|     James|   1|hair|black|
|   Michael|   0| eye| NULL|
|   Michael|   1|hair|brown|
|    Robert|   0| eye|     |
|    Robert|   1|hair|  red|
|Washington|NULL|NULL| NULL|
| Jefferson|NULL|NULL| NULL|
+----------+----+----+-----+



# Prac F

In [10]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode,flatten

In [11]:
spark = SparkSession.builder.appName('pyspark-by-examples').getOrCreate()

In [55]:
arrayArrayData = [
    ("James", [['Java', 'Scala', 'C++'], ["Spark", "Java"]]),
    ("Michael", [["Spark", "Java", "C++"], ["Spark", "Java"]]),
    ("Robert", [["CSharp", "VB"], ["Spark", "Python"]])  # FIXED
]


In [56]:
df = spark.createDataFrame(data = arrayArrayData, schema = ['name', 'subjects'])
df.printSchema()
df.show(truncate = False)

root
 |-- name: string (nullable = true)
 |-- subjects: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

+-------+-----------------------------------+
|name   |subjects                           |
+-------+-----------------------------------+
|James  |[[Java, Scala, C++], [Spark, Java]]|
|Michael|[[Spark, Java, C++], [Spark, Java]]|
|Robert |[[CSharp, VB], [Spark, Python]]    |
+-------+-----------------------------------+



In [None]:
df.select(df.name, explode(df.subjects)).show(truncate = False)


"""create a single array from an array of arrays"""

df.select(df.name, flatten(df.subjects)).show(truncate=False)