In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("local").getOrCreate()

In [2]:
data = [("James","Smith","USA","CA"),("Michael","Rose","USA","NY"), \
    ("Robert","Williams","USA","CA"),("Maria","Jones","USA","FL") \
  ]

columns = ["firstname","lastname","country","state"]

In [3]:
df = spark.createDataFrame(data=data, schema=columns)
df.show()

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|    James|   Smith|    USA|   CA|
|  Michael|    Rose|    USA|   NY|
|   Robert|Williams|    USA|   CA|
|    Maria|   Jones|    USA|   FL|
+---------+--------+-------+-----+



In [4]:
df.collect()

[Row(firstname='James', lastname='Smith', country='USA', state='CA'),
 Row(firstname='Michael', lastname='Rose', country='USA', state='NY'),
 Row(firstname='Robert', lastname='Williams', country='USA', state='CA'),
 Row(firstname='Maria', lastname='Jones', country='USA', state='FL')]

In [5]:
spark.createDataFrame(df.collect()).show()

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|    James|   Smith|    USA|   CA|
|  Michael|    Rose|    USA|   NY|
|   Robert|Williams|    USA|   CA|
|    Maria|   Jones|    USA|   FL|
+---------+--------+-------+-----+



In [7]:
state1 = df.rdd.map(lambda x: x[3]).collect()
state1

['CA', 'NY', 'CA', 'FL']

In [11]:
df.rdd.map(lambda x : x.state).collect()

['CA', 'NY', 'CA', 'FL']

In [15]:
df.select('state').rdd.map(lambda x: x.state).collect()

['CA', 'NY', 'CA', 'FL']

In [17]:
df.select('state').toPandas()['state'].tolist()

['CA', 'NY', 'CA', 'FL']

In [43]:
from pyspark.sql.types import StringType, StructType, StructField, DateType
from pyspark.sql.functions import to_date
from pyspark.sql import Row

# Sample data with a string column representing dates
data = [("1/12/2023",), ("2/15/2023",), ("3/20/2023",)]
columns = ["date_str"]

df = spark.createDataFrame(data, columns)

# Convert the string column to a DateType
df = df.withColumn("date", to_date("date_str", "M/d/yyyy").cast(DateType()))

df.show()

+---------+----------+
| date_str|      date|
+---------+----------+
|1/12/2023|2023-01-12|
|2/15/2023|2023-02-15|
|3/20/2023|2023-03-20|
+---------+----------+



In [23]:
import pandas as pd

data = [['Scott', 50], ['Jeff', 45], ['Thomas', 54],['Ann',34]] 
  
# Create the pandas DataFrame 
pandasDF = pd.DataFrame(data, columns = ['Name', 'Age']) 

pandasDF.head()

Unnamed: 0,Name,Age
0,Scott,50
1,Jeff,45
2,Thomas,54
3,Ann,34


In [25]:
df = spark.createDataFrame(pandasDF)
df.show()

+------+---+
|  Name|Age|
+------+---+
| Scott| 50|
|  Jeff| 45|
|Thomas| 54|
|   Ann| 34|
+------+---+



In [27]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)



In [35]:
from pyspark.sql.types import StringType, StructField, IntegerType, DateType

schema = StructType([
    StructField('Name', StringType(), True),
    StructField('Age', IntegerType(), True)
])

df = spark.createDataFrame(pandasDF, schema=schema)
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)



In [30]:
pandasDF2=df.select("*").toPandas
print(pandasDF2)

<bound method PandasConversionMixin.toPandas of DataFrame[Name: string, Age: int]>


In [31]:
test=spark.conf.get("spark.sql.execution.arrow.enabled")
print(test)

test123=spark.conf.get("spark.sql.execution.arrow.pyspark.fallback.enabled")
print(test123)

false
true


In [34]:
spark.conf.set("spark.sql.execution.arrow.enabled","false")
spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")

pandasDF2=df.select("*").toPandas
print(pandasDF2)

<bound method PandasConversionMixin.toPandas of DataFrame[Name: string, Age: int]>


In [58]:
from pyspark.sql.functions import col,expr
data=[("2019-01-23",1),("2019-06-24",2),("2019-09-20",3)]

In [63]:
spark.createDataFrame(data).toDF("date","increment").select(col('date'),col('increment'),expr("add_months(to_date(date,'yyyy-MM-dd'),cast(increment as int))").alias("inc_date")).show()

+----------+---------+----------+
|      date|increment|  inc_date|
+----------+---------+----------+
|2019-01-23|        1|2019-02-23|
|2019-06-24|        2|2019-08-24|
|2019-09-20|        3|2019-12-20|
+----------+---------+----------+



In [65]:
from pyspark.sql.functions import add_months, to_date, 

In [73]:
spark.createDataFrame(data,schema=["date","increment"]).select(['date','increment',add_months(to_date('date'),'increment').alias("inc_date")]).show()

+----------+---------+----------+
|      date|increment|  inc_date|
+----------+---------+----------+
|2019-01-23|        1|2019-02-23|
|2019-06-24|        2|2019-08-24|
|2019-09-20|        3|2019-12-20|
+----------+---------+----------+



In [74]:
from pyspark.sql.functions import add_months, to_date
data=[("2019-01-23",1),("2019-06-24",2),("2019-09-20",3)]
spark.createDataFrame(data,schema=["date","increment"]).select(['date','increment',add_months(to_date('date'),'increment').alias("inc_date")]).show()

+----------+---------+----------+
|      date|increment|  inc_date|
+----------+---------+----------+
|2019-01-23|        1|2019-02-23|
|2019-06-24|        2|2019-08-24|
|2019-09-20|        3|2019-12-20|
+----------+---------+----------+



In [75]:
data = [('James','Smith','M',3000),
  ('Anna','Rose','F',4100),
  ('Robert','Williams','M',6200), 
]

columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()

+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
|    James|   Smith|     M|  3000|
|     Anna|    Rose|     F|  4100|
|   Robert|Williams|     M|  6200|
+---------+--------+------+------+

aa


In [76]:
df.columns

['firstname', 'lastname', 'gender', 'salary']

In [77]:
# Add new constanct column
from pyspark.sql.functions import lit
df.withColumn("bonus_percent", lit(0.3)) \
  .show()

+---------+--------+------+------+-------------+
|firstname|lastname|gender|salary|bonus_percent|
+---------+--------+------+------+-------------+
|    James|   Smith|     M|  3000|          0.3|
|     Anna|    Rose|     F|  4100|          0.3|
|   Robert|Williams|     M|  6200|          0.3|
+---------+--------+------+------+-------------+



In [78]:
df.withColumn("bonus_amount",df.salary*0.3).show()

+---------+--------+------+------+------------+
|firstname|lastname|gender|salary|bonus_amount|
+---------+--------+------+------+------------+
|    James|   Smith|     M|  3000|       900.0|
|     Anna|    Rose|     F|  4100|      1230.0|
|   Robert|Williams|     M|  6200|      1860.0|
+---------+--------+------+------+------------+



In [79]:
from pyspark.sql.functions import concat_ws
df.withColumn("name", concat_ws(",","firstname",'lastname')) \
  .show()

+---------+--------+------+------+---------------+
|firstname|lastname|gender|salary|           name|
+---------+--------+------+------+---------------+
|    James|   Smith|     M|  3000|    James,Smith|
|     Anna|    Rose|     F|  4100|      Anna,Rose|
|   Robert|Williams|     M|  6200|Robert,Williams|
+---------+--------+------+------+---------------+



In [80]:
#Add current date
from pyspark.sql.functions import current_date
df.withColumn("current_date", current_date()) \
  .show()

+---------+--------+------+------+------------+
|firstname|lastname|gender|salary|current_date|
+---------+--------+------+------+------------+
|    James|   Smith|     M|  3000|  2024-02-21|
|     Anna|    Rose|     F|  4100|  2024-02-21|
|   Robert|Williams|     M|  6200|  2024-02-21|
+---------+--------+------+------+------------+



In [87]:
from pyspark.sql.functions import when

df.withColumn("grade",
             when((df.salary<4000), lit('A')).when((df.salary>=4000) & (df.salary<=5000),lit('B')).otherwise(lit('C'))).show()

+---------+--------+------+------+-----+
|firstname|lastname|gender|salary|grade|
+---------+--------+------+------+-----+
|    James|   Smith|     M|  3000|    A|
|     Anna|    Rose|     F|  4100|    B|
|   Robert|Williams|     M|  6200|    C|
+---------+--------+------+------+-----+



In [88]:
# Add column using select
df.select("firstname","salary", lit(0.3).alias("bonus")).show()
df.select("firstname","salary", lit(df.salary * 0.3).alias("bonus_amount")).show()
df.select("firstname","salary", current_date().alias("today_date")).show()

+---------+------+-----+
|firstname|salary|bonus|
+---------+------+-----+
|    James|  3000|  0.3|
|     Anna|  4100|  0.3|
|   Robert|  6200|  0.3|
+---------+------+-----+

+---------+------+------------+
|firstname|salary|bonus_amount|
+---------+------+------------+
|    James|  3000|       900.0|
|     Anna|  4100|      1230.0|
|   Robert|  6200|      1860.0|
+---------+------+------------+

+---------+------+----------+
|firstname|salary|today_date|
+---------+------+----------+
|    James|  3000|2024-02-21|
|     Anna|  4100|2024-02-21|
|   Robert|  6200|2024-02-21|
+---------+------+----------+



In [89]:
simpleData = [("James", "Sales", 3000),
    ("Michael", "Sales", 4600),
    ("Robert", "Sales", 4100),
    ("Maria", "Finance", 3000),
    ("James", "Sales", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100)
  ]
schema = ["employee_name", "department", "salary"]
  
df = spark.createDataFrame(data=simpleData, schema = schema)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [93]:
from pyspark.sql.functions import approx_count_distinct,collect_list
from pyspark.sql.functions import collect_set,sum,avg,max,countDistinct,count
from pyspark.sql.functions import first, last, kurtosis, min, mean, skewness 
from pyspark.sql.functions import stddev, stddev_samp, stddev_pop, sumDistinct
from pyspark.sql.functions import variance,var_samp,  var_pop

In [94]:
print("approx_count_distinct: " + \
      str(df.select(approx_count_distinct("salary")).collect()[0][0]))

approx_count_distinct: 6


In [98]:
df.select(approx_count_distinct("salary")).collect()[0]

Row(approx_count_distinct(salary)=6)

In [99]:
print("avg: " + str(df.select(avg("salary")).collect()[0][0]))

avg: 3400.0


In [106]:
df.select(avg('salary')).collect()[0][0]

3400.0

In [108]:
df.select(avg('salary')).show()

+-----------+
|avg(salary)|
+-----------+
|     3400.0|
+-----------+



In [109]:
simpleData = [("James", "Sales", 3000),
    ("Michael", "Sales", 4600),
    ("Robert", "Sales", 4100),
    ("Maria", "Finance", 3000),
    ("James", "Sales", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100)
  ]
schema = ["employee_name", "department", "salary"]
  
  
df = spark.createDataFrame(data=simpleData, schema = schema)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [111]:
df.select(approx_count_distinct('department')).show()

+---------------------------------+
|approx_count_distinct(department)|
+---------------------------------+
|                                3|
+---------------------------------+



In [115]:
df.select(collect_list("department")).show(truncate=False)

+------------------------------------------------------------------------------------+
|collect_list(department)                                                            |
+------------------------------------------------------------------------------------+
|[Sales, Sales, Sales, Finance, Sales, Finance, Finance, Marketing, Marketing, Sales]|
+------------------------------------------------------------------------------------+



In [116]:
df.select(collect_set("salary")).show(truncate=False)

+------------------------------------+
|collect_set(salary)                 |
+------------------------------------+
|[4600, 3000, 3900, 4100, 3300, 2000]|
+------------------------------------+



In [117]:
df.select(countDistinct("department", "salary")).show(truncate=False)

+----------------------------------+
|count(DISTINCT department, salary)|
+----------------------------------+
|8                                 |
+----------------------------------+



In [118]:
columns = ["name","languagesAtSchool","currentState"]
data = [("James,,Smith",["Java","Scala","C++"],"CA"), \
    ("Michael,Rose,",["Spark","Java","C++"],"NJ"), \
    ("Robert,,Williams",["CSharp","VB"],"NV")]

df = spark.createDataFrame(data=data,schema=columns)
df.printSchema()
df.show(truncate=False)

root
 |-- name: string (nullable = true)
 |-- languagesAtSchool: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- currentState: string (nullable = true)

+----------------+------------------+------------+
|name            |languagesAtSchool |currentState|
+----------------+------------------+------------+
|James,,Smith    |[Java, Scala, C++]|CA          |
|Michael,Rose,   |[Spark, Java, C++]|NJ          |
|Robert,,Williams|[CSharp, VB]      |NV          |
+----------------+------------------+------------+



In [119]:
df.withColumn('languages_school', concat_ws(",","languagesAtSchool")).show()

+----------------+------------------+------------+----------------+
|            name| languagesAtSchool|currentState|languages_school|
+----------------+------------------+------------+----------------+
|    James,,Smith|[Java, Scala, C++]|          CA|  Java,Scala,C++|
|   Michael,Rose,|[Spark, Java, C++]|          NJ|  Spark,Java,C++|
|Robert,,Williams|      [CSharp, VB]|          NV|       CSharp,VB|
+----------------+------------------+------------+----------------+



In [121]:
columns = ["name","languagesAtSchool","currentState"]
data = [("James,,Smith",["Java","Scala","C++"],"CA"), \
    ("Michael,Rose,",["Spark","Java","C++"],"NJ"), \
    ("Robert,,Williams",["CSharp","VB"],"NV")]

df = spark.createDataFrame(data,columns)

df.select('name','currentState',concat_ws(',','languagesAtSchool')).show()

+----------------+------------+-------------------------------+
|            name|currentState|concat_ws(,, languagesAtSchool)|
+----------------+------------+-------------------------------+
|    James,,Smith|          CA|                 Java,Scala,C++|
|   Michael,Rose,|          NJ|                 Spark,Java,C++|
|Robert,,Williams|          NV|                      CSharp,VB|
+----------------+------------+-------------------------------+



In [123]:
from pyspark.sql.types import ArrayType
data = [
 ("James,,Smith",["Java","Scala","C++"],["Spark","Java"],"OH","CA"),
 ("Michael,Rose,",["Spark","Java","C++"],["Spark","Java"],"NY","NJ"),
 ("Robert,,Williams",["CSharp","VB"],["Spark","Python"],"UT","NV")
]

schema = StructType([ 
    StructField("name",StringType(),True), 
    StructField("languagesAtSchool",ArrayType(StringType()),True), 
    StructField("languagesAtWork",ArrayType(StringType()),True), 
    StructField("currentState", StringType(), True), 
    StructField("previousState", StringType(), True) 
  ])

df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show()

root
 |-- name: string (nullable = true)
 |-- languagesAtSchool: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- languagesAtWork: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- currentState: string (nullable = true)
 |-- previousState: string (nullable = true)

+----------------+------------------+---------------+------------+-------------+
|            name| languagesAtSchool|languagesAtWork|currentState|previousState|
+----------------+------------------+---------------+------------+-------------+
|    James,,Smith|[Java, Scala, C++]|  [Spark, Java]|          OH|           CA|
|   Michael,Rose,|[Spark, Java, C++]|  [Spark, Java]|          NY|           NJ|
|Robert,,Williams|      [CSharp, VB]|[Spark, Python]|          UT|           NV|
+----------------+------------------+---------------+------------+-------------+



In [125]:
from pyspark.sql.functions import explode,split
df.select('name', explode('languagesAtSchool')).show()

+----------------+------+
|            name|   col|
+----------------+------+
|    James,,Smith|  Java|
|    James,,Smith| Scala|
|    James,,Smith|   C++|
|   Michael,Rose,| Spark|
|   Michael,Rose,|  Java|
|   Michael,Rose,|   C++|
|Robert,,Williams|CSharp|
|Robert,,Williams|    VB|
+----------------+------+



In [126]:
df.select(split("name",",")).show()

+--------------------+
|  split(name, ,, -1)|
+--------------------+
|    [James, , Smith]|
|   [Michael, Rose, ]|
|[Robert, , Williams]|
+--------------------+



In [127]:
from pyspark.sql.functions import array
df.select(df.name,array(df.currentState,df.previousState).alias("States")).show()

+----------------+--------+
|            name|  States|
+----------------+--------+
|    James,,Smith|[OH, CA]|
|   Michael,Rose,|[NY, NJ]|
|Robert,,Williams|[UT, NV]|
+----------------+--------+



In [128]:
from pyspark.sql.functions import array_contains
df.select(df.name,array_contains(df.languagesAtSchool,"Java")
    .alias("array_contains")).show()

+----------------+--------------+
|            name|array_contains|
+----------------+--------------+
|    James,,Smith|          true|
|   Michael,Rose,|          true|
|Robert,,Williams|         false|
+----------------+--------------+

