In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
data = [(1,'Naman',3000), (2,'Tejal',4000)]

In [0]:
df = spark.createDataFrame(data,['Id','Name','Salary'])

In [0]:
df.show()

+---+-----+------+
| Id| Name|Salary|
+---+-----+------+
|  1|Naman|  3000|
|  2|Tejal|  4000|
+---+-----+------+



In [0]:
df.schema

Out[5]: StructType([StructField('Id', LongType(), True), StructField('Name', StringType(), True), StructField('Salary', LongType(), True)])

In [0]:
df.dtypes

Out[6]: [('Id', 'bigint'), ('Name', 'string'), ('Salary', 'bigint')]

In [0]:
df.columns

Out[7]: ['Id', 'Name', 'Salary']

In [0]:
df.sparkSession

In [0]:
df.rdd.collect()

Out[10]: [Row(Id=1, Name='Naman', Salary=3000), Row(Id=2, Name='Tejal', Salary=4000)]

In [0]:
df.rdd.glom().collect()

Out[11]: [[],
 [],
 [],
 [Row(Id=1, Name='Naman', Salary=3000)],
 [],
 [],
 [],
 [Row(Id=2, Name='Tejal', Salary=4000)]]

In [0]:
df.toJSON().collect()

Out[12]: ['{"Id":1,"Name":"Naman","Salary":3000}',
 '{"Id":2,"Name":"Tejal","Salary":4000}']

In [0]:
type(df.toJSON())

Out[13]: pyspark.rdd.RDD

In [0]:
df.distinct().show()

+---+-----+------+
| Id| Name|Salary|
+---+-----+------+
|  1|Naman|  3000|
|  2|Tejal|  4000|
+---+-----+------+



In [0]:
df.printSchema()

root
 |-- Id: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Salary: long (nullable = true)



In [0]:
df1 = df.withColumn('Id', df.Id.cast('string'))

In [0]:
df1.show()

+---+-----+------+
| Id| Name|Salary|
+---+-----+------+
|  1|Naman|  3000|
|  2|Tejal|  4000|
+---+-----+------+



In [0]:
df1 = df.withColumn('Id', df.Id.cast(IntegerType()))

In [0]:
df1.collect()

Out[27]: [Row(Id=1, Name='Naman', Salary=3000), Row(Id=2, Name='Tejal', Salary=4000)]

In [0]:
df2 = df.withColumn('Salary', df.Salary*2)

In [0]:
df2.show()

+---+-----+------+
| Id| Name|Salary|
+---+-----+------+
|  1|Naman|  6000|
|  2|Tejal|  8000|
+---+-----+------+



In [0]:
df3 = df.withColumn('Country1', lit('India'))

[0;31m---------------------------------------------------------------------------[0m
[0;31mPySparkTypeError[0m                          Traceback (most recent call last)
File [0;32m<command-2155213509604133>:1[0m
[0;32m----> 1[0m df3 [38;5;241m=[39m [43mdf[49m[38;5;241;43m.[39;49m[43mwithColumn[49m[43m([49m[38;5;124;43m'[39;49m[38;5;124;43mCountry[39;49m[38;5;124;43m'[39;49m[43m,[49m[43m [49m[38;5;124;43m'[39;49m[38;5;124;43mIndia[39;49m[38;5;124;43m'[39;49m[43m)[49m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241;43m*[39;49m[43mkwargs[49m[43m)[49m
[1;32m     49[

In [0]:
df3.show

+---+-----+------+-------+
| Id| Name|Salary|Country|
+---+-----+------+-------+
|  1|Naman|  3000|  India|
|  2|Tejal|  4000|  India|
+---+-----+------+-------+



In [0]:
df4 = df3.withColumns({'Country':lit('USA'), 'Salary': df.Salary*4})

In [0]:
df4.show()

+---+-----+------+-------+
| Id| Name|Salary|Country|
+---+-----+------+-------+
|  1|Naman| 12000|    USA|
|  2|Tejal| 16000|    USA|
+---+-----+------+-------+



In [0]:
df5 = df4.withColumnRenamed('Country', 'Current-Location')

In [0]:
df5.show()

+---+-----+------+----------------+
| Id| Name|Salary|Current-Location|
+---+-----+------+----------------+
|  1|Naman| 12000|             USA|
|  2|Tejal| 16000|             USA|
+---+-----+------+----------------+



In [0]:
# complex data -- Array datatype
data = [(1,'Naman',25,['Hadoop','MongoDB','Spark']),(2,'Tejal',27,['Photoshop','UI-UX','Painting']) ]

In [0]:
schema_array = StructType([StructField('Id',IntegerType()),StructField('Name',StringType()), StructField('Age', IntegerType()), StructField('Skills',ArrayType(StringType()))])

In [0]:
df6 = spark.createDataFrame(data, schema=schema_array)

In [0]:
df6.show(truncate=False)

+---+-----+---+----------------------------+
|Id |Name |Age|Skills                      |
+---+-----+---+----------------------------+
|1  |Naman|25 |[Hadoop, MongoDB, Spark]    |
|2  |Tejal|27 |[Photoshop, UI-UX, Painting]|
+---+-----+---+----------------------------+



In [0]:
df7 = spark.createDataFrame(data)

In [0]:
df7.show()

+---+-----+---+--------------------+
| _1|   _2| _3|                  _4|
+---+-----+---+--------------------+
|  1|Naman| 25|[Hadoop, MongoDB,...|
|  2|Tejal| 27|[Photoshop, UI-UX...|
+---+-----+---+--------------------+



In [0]:
df7.printSchema()

root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)
 |-- _4: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [0]:
df8 = df6.withColumn('Best-Skilled At', df6.Skills[0])

In [0]:
df8.show()

+---+-----+---+--------------------+---------------+
| Id| Name|Age|              Skills|Best-Skilled At|
+---+-----+---+--------------------+---------------+
|  1|Naman| 25|[Hadoop, MongoDB,...|         Hadoop|
|  2|Tejal| 27|[Photoshop, UI-UX...|      Photoshop|
+---+-----+---+--------------------+---------------+



In [0]:
df6.withColumn('Best-Skilled At', expr("Skills[0]")).show()

+---+-----+---+--------------------+---------------+
| Id| Name|Age|              Skills|Best-Skilled At|
+---+-----+---+--------------------+---------------+
|  1|Naman| 25|[Hadoop, MongoDB,...|         Hadoop|
|  2|Tejal| 27|[Photoshop, UI-UX...|      Photoshop|
+---+-----+---+--------------------+---------------+



In [0]:
df6.withColumn('Top 2 Skills', array(df6.Skills[0], df6.Skills[1])).show()

+---+-----+---+--------------------+------------------+
| Id| Name|Age|              Skills|      Top 2 Skills|
+---+-----+---+--------------------+------------------+
|  1|Naman| 25|[Hadoop, MongoDB,...| [Hadoop, MongoDB]|
|  2|Tejal| 27|[Photoshop, UI-UX...|[Photoshop, UI-UX]|
+---+-----+---+--------------------+------------------+



In [0]:
df6.withColumn('Player-Skill', array(df6.Name, df6.Skills[1])).show()

+---+-----+---+--------------------+----------------+
| Id| Name|Age|              Skills|    Player-Skill|
+---+-----+---+--------------------+----------------+
|  1|Naman| 25|[Hadoop, MongoDB,...|[Naman, MongoDB]|
|  2|Tejal| 27|[Photoshop, UI-UX...|  [Tejal, UI-UX]|
+---+-----+---+--------------------+----------------+



In [0]:
df6.withColumn('Name-Skill', array(col('Name'), col('Skills')[2], col('Skills')[1])).show(truncate=False)

+---+-----+---+----------------------------+------------------------+
|Id |Name |Age|Skills                      |Name-Skill              |
+---+-----+---+----------------------------+------------------------+
|1  |Naman|25 |[Hadoop, MongoDB, Spark]    |[Naman, Spark, MongoDB] |
|2  |Tejal|27 |[Photoshop, UI-UX, Painting]|[Tejal, Painting, UI-UX]|
+---+-----+---+----------------------------+------------------------+



In [0]:
df6.withColumn('isSkilledInHadoop', array_contains(df6.Skills, 'Hadoop')).show()

+---+-----+---+--------------------+-----------------+
| Id| Name|Age|              Skills|isSkilledInHadoop|
+---+-----+---+--------------------+-----------------+
|  1|Naman| 25|[Hadoop, MongoDB,...|             true|
|  2|Tejal| 27|[Photoshop, UI-UX...|            false|
+---+-----+---+--------------------+-----------------+



In [0]:
df8 = df6.withColumn('Skills', explode(df6.Skills)).show()

+---+-----+---+---------+
| Id| Name|Age|   Skills|
+---+-----+---+---------+
|  1|Naman| 25|   Hadoop|
|  1|Naman| 25|  MongoDB|
|  1|Naman| 25|    Spark|
|  2|Tejal| 27|Photoshop|
|  2|Tejal| 27|    UI-UX|
|  2|Tejal| 27| Painting|
+---+-----+---+---------+



In [0]:
df9 = spark.createDataFrame([(1, "Naman", 'Hadoop,Spark,SQl'),
                       (2, "Tejal", 'UI-UX,Painting')])

In [0]:
df9.show()

+---+-----+----------------+
| _1|   _2|              _3|
+---+-----+----------------+
|  1|Naman|Hadoop,Spark,SQl|
|  2|Tejal|  UI-UX,Painting|
+---+-----+----------------+



In [0]:
df9.withColumn('_3', split(df9._3,',')).show()

+---+-----+--------------------+
| _1|   _2|                  _3|
+---+-----+--------------------+
|  1|Naman|[Hadoop, Spark, SQl]|
|  2|Tejal|   [UI-UX, Painting]|
+---+-----+--------------------+



In [0]:
data_map = [(1,'Naman',{'H.NO':'M-129','Street-Number':6,'Pincode':110032}),
            (2,'Tejal',{'H.NO':'A-124','Street-Number':5,'Pincode':221003})]

In [0]:
df12 = spark.createDataFrame(data_map,schema=['Id','Name','Address'])

In [0]:
df12.show(truncate=False)

+---+-----+------------------------------------------------------+
|Id |Name |Address                                               |
+---+-----+------------------------------------------------------+
|1  |Naman|{Street-Number -> 6, H.NO -> M-129, Pincode -> 110032}|
|2  |Tejal|{Street-Number -> 5, H.NO -> A-124, Pincode -> 221003}|
+---+-----+------------------------------------------------------+



In [0]:
df12.printSchema()

root
 |-- Id: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Address: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



In [0]:
schema_map = StructType([StructField('Id', LongType()),StructField('Name',StringType()),StructField('Address',MapType(StringType(),StringType()))])

In [0]:
df13 = spark.createDataFrame(data_map, schema_map)

In [0]:
df13.show(truncate=False)

+---+-----+------------------------------------------------------+
|Id |Name |Address                                               |
+---+-----+------------------------------------------------------+
|1  |Naman|{Street-Number -> 6, H.NO -> M-129, Pincode -> 110032}|
|2  |Tejal|{Street-Number -> 5, H.NO -> A-124, Pincode -> 221003}|
+---+-----+------------------------------------------------------+



In [0]:
df13.withColumns({'St.No':df13['Address']['Street-Number'],'Pincode': df13['Address']['Pincode'] }).show(truncate=False)

+---+-----+------------------------------------------------------+-----+-------+
|Id |Name |Address                                               |St.No|Pincode|
+---+-----+------------------------------------------------------+-----+-------+
|1  |Naman|{Street-Number -> 6, H.NO -> M-129, Pincode -> 110032}|6    |110032 |
|2  |Tejal|{Street-Number -> 5, H.NO -> A-124, Pincode -> 221003}|5    |221003 |
+---+-----+------------------------------------------------------+-----+-------+



In [0]:
# MapType Functions 

In [0]:
df13.withColumn('Map-Keys', map_keys(df13.Address)).show()

+---+-----+--------------------+--------------------+
| Id| Name|             Address|            Map-Keys|
+---+-----+--------------------+--------------------+
|  1|Naman|{Street-Number ->...|[Street-Number, H...|
|  2|Tejal|{Street-Number ->...|[Street-Number, H...|
+---+-----+--------------------+--------------------+



In [0]:
df13.withColumn('Map-Values', map_values(df13.Address)).show()

+---+-----+--------------------+------------------+
| Id| Name|             Address|        Map-Values|
+---+-----+--------------------+------------------+
|  1|Naman|{Street-Number ->...|[6, M-129, 110032]|
|  2|Tejal|{Street-Number ->...|[5, A-124, 221003]|
+---+-----+--------------------+------------------+



In [0]:
df13.withColumn('keys',explode(map_keys(df13.Address))).show()

+---+-----+--------------------+-------------+
| Id| Name|             Address|         keys|
+---+-----+--------------------+-------------+
|  1|Naman|{Street-Number ->...|Street-Number|
|  1|Naman|{Street-Number ->...|         H.NO|
|  1|Naman|{Street-Number ->...|      Pincode|
|  2|Tejal|{Street-Number ->...|Street-Number|
|  2|Tejal|{Street-Number ->...|         H.NO|
|  2|Tejal|{Street-Number ->...|      Pincode|
+---+-----+--------------------+-------------+



In [0]:
from pyspark.sql.types import Row

In [0]:
Employee = Row('Id','Name','Age','Salary')

In [0]:
data = [Employee(1,'naman',25,75000), Employee(2,'preeti',24,100000)]

In [0]:
df15 = spark.createDataFrame(data)

In [0]:
df15.show()

+---+------+---+------+
| Id|  Name|Age|Salary|
+---+------+---+------+
|  1| naman| 25| 75000|
|  2|preeti| 24|100000|
+---+------+---+------+



In [0]:
df.printSchema()

root
 |-- Id: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Salary: long (nullable = true)



In [0]:
Employee = Row('Id','Details')

In [0]:
data = [Employee(1,{'Name':'Naman','Age':25}), Employee(2, {'Name':'Preeti', 'Age':23})]

In [0]:
df16 = spark.createDataFrame(data)

In [0]:
df16.show()

+---+--------------------+
| Id|             Details|
+---+--------------------+
|  1|{Age -> 25, Name ...|
|  2|{Age -> 23, Name ...|
+---+--------------------+



In [0]:
df16.rdd.collect()

Out[149]: [Row(Id=1, Details={'Age': '25', 'Name': 'Naman'}),
 Row(Id=2, Details={'Age': '23', 'Name': 'Preeti'})]

In [0]:
data = [Row(1,'Naman',Row('Hadoop','3yrs+'))]

In [0]:
df17 = spark.createDataFrame(data)

In [0]:
df17.show()

+---+-----+---------------+
| _1|   _2|             _3|
+---+-----+---------------+
|  1|Naman|{Hadoop, 3yrs+}|
+---+-----+---------------+



In [0]:
col1 = lit('abcd')

In [0]:
type(col1)

Out[154]: pyspark.sql.column.Column

In [0]:
df16.show()

+---+--------------------+
| Id|             Details|
+---+--------------------+
|  1|{Age -> 25, Name ...|
|  2|{Age -> 23, Name ...|
+---+--------------------+



In [0]:
df15.show()

+---+------+---+------+
| Id|  Name|Age|Salary|
+---+------+---+------+
|  1| naman| 25| 75000|
|  2|preeti| 24|100000|
+---+------+---+------+



In [0]:
df15.withColumn('Salary_updated', when((df15.Salary >=80000), value=df15.Salary*1.5).otherwise(df15.Salary)).show()

+---+------+---+------+--------------+
| Id|  Name|Age|Salary|Salary_updated|
+---+------+---+------+--------------+
|  1| naman| 25| 75000|       75000.0|
|  2|preeti| 24|100000|      150000.0|
+---+------+---+------+--------------+



In [0]:
df15.withColumn('Salary_updated', when((df15.Salary >=80000), value=df15.Salary*1.5).otherwise(df15.Salary)).show()

+---+------+---+------+--------------+
| Id|  Name|Age|Salary|Salary_updated|
+---+------+---+------+--------------+
|  1| naman| 25| 75000|       75000.0|
|  2|preeti| 24|100000|      150000.0|
+---+------+---+------+--------------+



In [0]:
df15.select(expr("max(Salary)")).show()

+-----------+
|max(Salary)|
+-----------+
|     100000|
+-----------+



In [0]:
df15.sort(df15.Salary.desc()).show()

+---+------+---+------+
| Id|  Name|Age|Salary|
+---+------+---+------+
|  2|preeti| 24|100000|
|  1| naman| 25| 75000|
+---+------+---+------+



In [0]:
df15.filter(df15.Name.like("%e_%")).show()

+---+------+---+------+
| Id|  Name|Age|Salary|
+---+------+---+------+
|  2|preeti| 24|100000|
+---+------+---+------+



In [0]:
# filtering
df15.filter('Salary>=8000').show()

+---+------+---+------+
| Id|  Name|Age|Salary|
+---+------+---+------+
|  1| naman| 25| 75000|
|  2|preeti| 24|100000|
+---+------+---+------+

