In [1]:
import findspark
findspark.init()

import pyspark
findspark.find()

'C:\\spark\\spark-3.2.0-bin-hadoop3.2'

In [2]:
# Create SparkSession from builder
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local") \
                    .appName('My App') \
                    .getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x000002D99AD7FCA0>


In [4]:
df = spark.createDataFrame(
    [("Hemanth", 250000), ("Hem", 350000), ("Hemanth Kumar", 210000)],["Name","Salary"])
df.show()


+-------------+------+
|         Name|Salary|
+-------------+------+
|      Hemanth|250000|
|          Hem|350000|
|Hemanth Kumar|210000|
+-------------+------+



In [36]:
arrayArrayData = [
  ("James",[["Java","Scala","C++"],["Spark","Java"]]),
  ("Michael",[["Spark","Java","C++"],["Spark","Java"]]),
  ("Robert",[["CSharp","VB"],["Spark","Python"]])
]

df = spark.createDataFrame(data=arrayArrayData, schema = ['name','subjects'])
df.printSchema()
df.show(truncate=False)


root
 |-- name: string (nullable = true)
 |-- subjects: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

+-------+-----------------------------------+
|name   |subjects                           |
+-------+-----------------------------------+
|James  |[[Java, Scala, C++], [Spark, Java]]|
|Michael|[[Spark, Java, C++], [Spark, Java]]|
|Robert |[[CSharp, VB], [Spark, Python]]    |
+-------+-----------------------------------+



In [17]:

from pyspark.sql.functions import explode
df.select(df.name,explode(df.subjects)).show(truncate=False)


+-------+------------------+
|name   |col               |
+-------+------------------+
|James  |[Java, Scala, C++]|
|James  |[Spark, Java]     |
|Michael|[Spark, Java, C++]|
|Michael|[Spark, Java]     |
|Robert |[CSharp, VB]      |
|Robert |[Spark, Python]   |
+-------+------------------+



In [37]:
df = df.withColumn("explode_cal",explode(df.subjects))
df.show(truncate=False)

+-------+-----------------------------------+------------------+
|name   |subjects                           |explode_cal       |
+-------+-----------------------------------+------------------+
|James  |[[Java, Scala, C++], [Spark, Java]]|[Java, Scala, C++]|
|James  |[[Java, Scala, C++], [Spark, Java]]|[Spark, Java]     |
|Michael|[[Spark, Java, C++], [Spark, Java]]|[Spark, Java, C++]|
|Michael|[[Spark, Java, C++], [Spark, Java]]|[Spark, Java]     |
|Robert |[[CSharp, VB], [Spark, Python]]    |[CSharp, VB]      |
|Robert |[[CSharp, VB], [Spark, Python]]    |[Spark, Python]   |
+-------+-----------------------------------+------------------+



In [38]:
df = df.withColumn("from_cal",explode(df.explode_cal))
df.show(truncate=False)

+-------+-----------------------------------+------------------+--------+
|name   |subjects                           |explode_cal       |from_cal|
+-------+-----------------------------------+------------------+--------+
|James  |[[Java, Scala, C++], [Spark, Java]]|[Java, Scala, C++]|Java    |
|James  |[[Java, Scala, C++], [Spark, Java]]|[Java, Scala, C++]|Scala   |
|James  |[[Java, Scala, C++], [Spark, Java]]|[Java, Scala, C++]|C++     |
|James  |[[Java, Scala, C++], [Spark, Java]]|[Spark, Java]     |Spark   |
|James  |[[Java, Scala, C++], [Spark, Java]]|[Spark, Java]     |Java    |
|Michael|[[Spark, Java, C++], [Spark, Java]]|[Spark, Java, C++]|Spark   |
|Michael|[[Spark, Java, C++], [Spark, Java]]|[Spark, Java, C++]|Java    |
|Michael|[[Spark, Java, C++], [Spark, Java]]|[Spark, Java, C++]|C++     |
|Michael|[[Spark, Java, C++], [Spark, Java]]|[Spark, Java]     |Spark   |
|Michael|[[Spark, Java, C++], [Spark, Java]]|[Spark, Java]     |Java    |
|Robert |[[CSharp, VB], [Spark, Python

In [11]:

from pyspark.sql.functions import flatten
df.select(df.name,flatten(df.subjects)).show(truncate=False)


+-------+-------------------------------+
|name   |flatten(subjects)              |
+-------+-------------------------------+
|James  |[Java, Scala, C++, Spark, Java]|
|Michael|[Spark, Java, C++, Spark, Java]|
|Robert |[CSharp, VB, Spark, Python]    |
+-------+-------------------------------+



In [41]:
df = df.withColumn("allsub",flatten(df.subjects))
df.show(truncate=False)

+-------+-----------------------------------+------------------+--------+-------------------------------+
|name   |subjects                           |explode_cal       |from_cal|allsub                         |
+-------+-----------------------------------+------------------+--------+-------------------------------+
|James  |[[Java, Scala, C++], [Spark, Java]]|[Java, Scala, C++]|Java    |[Java, Scala, C++, Spark, Java]|
|James  |[[Java, Scala, C++], [Spark, Java]]|[Java, Scala, C++]|Scala   |[Java, Scala, C++, Spark, Java]|
|James  |[[Java, Scala, C++], [Spark, Java]]|[Java, Scala, C++]|C++     |[Java, Scala, C++, Spark, Java]|
|James  |[[Java, Scala, C++], [Spark, Java]]|[Spark, Java]     |Spark   |[Java, Scala, C++, Spark, Java]|
|James  |[[Java, Scala, C++], [Spark, Java]]|[Spark, Java]     |Java    |[Java, Scala, C++, Spark, Java]|
|Michael|[[Spark, Java, C++], [Spark, Java]]|[Spark, Java, C++]|Spark   |[Spark, Java, C++, Spark, Java]|
|Michael|[[Spark, Java, C++], [Spark, Java]]|[

In [47]:
df.select("name").distinct().show()

+-------+
|   name|
+-------+
|  James|
|Michael|
| Robert|
+-------+



In [4]:
from pyspark.sql import Row
from pyspark.sql.functions import collect_set
rdd = spark.sparkContext.parallelize([Row(user='Bob', word='hello'),
                                      Row(user='Bob', word='world'),
                                      Row(user='Mary', word='Have'),
                                      Row(user='Mary', word='a'),
                                      Row(user='Mary', word='nice'),
                                      Row(user='Mary', word='day')])
df = spark.createDataFrame(rdd)
group_user = df.groupBy('user').agg(collect_set('word').alias('words'))
sat = group_user.collect()


AttributeError: 'list' object has no attribute 'show'