In [27]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [28]:
from pyspark.sql import SparkSession

In [29]:
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .getOrCreate() 

In [30]:
from pyspark.sql import Row
row = Row('James',40)
print(row[0]+","+str(row[1]))

James,40


In [31]:
row = Row(name='Alice' , age = 11)
print(row)

Row(name='Alice', age=11)


In [32]:
print(row.name , row.age)

Alice 11


In [33]:
person = Row("name" , "age")
p1 = person('James' , 40)
p2 = person('Alice',11)
print(p1.name) 
print(p2.name)

James
Alice


In [34]:
data = [Row(name='James,,Smith' , lang=['Java' , 'Scala' , 'C++'] , state='CA'),
        Row(name='Micheal,Rose,',lang=['Spark','Java','C++'],state='NJ'),
        Row(name='Robert,Langdon,Halstead',lang=['CSharp',"VB"] , state="NV")]

rdd = spark.sparkContext.parallelize(data)
print(rdd.collect())      

[Row(name='James,,Smith', lang=['Java', 'Scala', 'C++'], state='CA'), Row(name='Micheal,Rose,', lang=['Spark', 'Java', 'C++'], state='NJ'), Row(name='Robert,Langdon,Halstead', lang=['CSharp', 'VB'], state='NV')]


In [35]:
collData=rdd.collect()
for row in collData:
  print(row.name + "," + str(row.lang))

James,,Smith,['Java', 'Scala', 'C++']
Micheal,Rose,,['Spark', 'Java', 'C++']
Robert,Langdon,Halstead,['CSharp', 'VB']


In [36]:
person = Row("name" , 'lang' , 'state')
data = [person("James,,Smith" , ['JAVA','scala','SQL'] , 'CA'),
        person("Micheal,Rose," , ['Java','Spark','C++'],'NJ'),
        person("David,Schrute,Langdon" ,['CSharp',"VB"] , 'NV')]
print(data)        


[Row(name='James,,Smith', lang=['JAVA', 'scala', 'SQL'], state='CA'), Row(name='Micheal,Rose,', lang=['Java', 'Spark', 'C++'], state='NJ'), Row(name='David,Schrute,Langdon', lang=['CSharp', 'VB'], state='NV')]


In [37]:
df = spark.createDataFrame(data)
df.show()
df.printSchema()

+--------------------+------------------+-----+
|                name|              lang|state|
+--------------------+------------------+-----+
|        James,,Smith|[JAVA, scala, SQL]|   CA|
|       Micheal,Rose,|[Java, Spark, C++]|   NJ|
|David,Schrute,Lan...|      [CSharp, VB]|   NV|
+--------------------+------------------+-----+

root
 |-- name: string (nullable = true)
 |-- lang: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- state: string (nullable = true)



In [38]:
columns = ['name','languageAtSchool','currentState']
df = spark.createDataFrame(data).toDF(*columns)
df.show()
df.printSchema()

+--------------------+------------------+------------+
|                name|  languageAtSchool|currentState|
+--------------------+------------------+------------+
|        James,,Smith|[JAVA, scala, SQL]|          CA|
|       Micheal,Rose,|[Java, Spark, C++]|          NJ|
|David,Schrute,Lan...|      [CSharp, VB]|          NV|
+--------------------+------------------+------------+

root
 |-- name: string (nullable = true)
 |-- languageAtSchool: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- currentState: string (nullable = true)



In [39]:
#Create DataFrame with struct using row class
data = [Row(name='James' , prop=Row(hair='Blonde',eyes='blue')),
        Row(name='Sebastian',prop=Row(hair='Brunette',eyes='Black'))]
df = spark.createDataFrame(data)
df.printSchema()
df.show()          

root
 |-- name: string (nullable = true)
 |-- prop: struct (nullable = true)
 |    |-- hair: string (nullable = true)
 |    |-- eyes: string (nullable = true)

+---------+-----------------+
|     name|             prop|
+---------+-----------------+
|    James|   {Blonde, blue}|
|Sebastian|{Brunette, Black}|
+---------+-----------------+



In [40]:
from pyspark.sql.functions import lit #lit is literal  used to add a new column to DataFrame by assigning a literal or constant value.
colObj = lit('sparkbyexamples.com')
colObj

Column<'sparkbyexamples.com'>

In [41]:
data = [('James',23),('Ann',40)]
df = spark.createDataFrame(data).toDF('name.fname','gender')
df.printSchema()
#root
#|-- name.fname: string (nullable = true)
#|-- gender: long (nullable = true)

#Using DataFrame object(df)
df.select(df['gender']).show()
#Accessing column name with dot (with backticks)
df.select(df["`name.fname`"]).show()

root
 |-- name.fname: string (nullable = true)
 |-- gender: long (nullable = true)

+------+
|gender|
+------+
|    23|
|    40|
+------+

+----------+
|name.fname|
+----------+
|     James|
|       Ann|
+----------+



In [42]:
#Another way to select out columns using col()
from pyspark.sql.functions import col
df.select(col('gender')).show()
#Accessing column name with dot (with backticks)
df.select(col('`name.fname`')).show()

+------+
|gender|
+------+
|    23|
|    40|
+------+

+----------+
|name.fname|
+----------+
|     James|
|       Ann|
+----------+



In [43]:
#Create dataframe with struct using Row class
from pyspark.sql import Row
data = [Row(name='James',prop=Row(hair='black',eyes='Blue')),
        Row(name="Ann",prop=Row(hair='Blonde',eyes='Green'))]
df=spark.createDataFrame(data) 
df.show()

#Access struct column
df.select(df['prop.hair']).show()
df.select(col('prop.hair')).show()

#Access to all columns with struct
df.select(col('prop.*')).show()
df.select(df['prop']).show()

+-----+---------------+
| name|           prop|
+-----+---------------+
|James|  {black, Blue}|
|  Ann|{Blonde, Green}|
+-----+---------------+

+------+
|  hair|
+------+
| black|
|Blonde|
+------+

+------+
|  hair|
+------+
| black|
|Blonde|
+------+

+------+-----+
|  hair| eyes|
+------+-----+
| black| Blue|
|Blonde|Green|
+------+-----+

+---------------+
|           prop|
+---------------+
|  {black, Blue}|
|{Blonde, Green}|
+---------------+



In [44]:
data = [(100,2,1),(200,3,4),(300,4,4)]
df=spark.createDataFrame(data).toDF('col1','col2','col3')
df.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
| 100|   2|   1|
| 200|   3|   4|
| 300|   4|   4|
+----+----+----+



In [45]:
#Arithmetic operations
df.select(df.col1 + df.col2).show()

+-------------+
|(col1 + col2)|
+-------------+
|          102|
|          203|
|          304|
+-------------+



In [46]:
df.select(df.col1-df.col2).show()

+-------------+
|(col1 - col2)|
+-------------+
|           98|
|          197|
|          296|
+-------------+



In [47]:
df.select(df.col1 / df.col2).show()

+-----------------+
|    (col1 / col2)|
+-----------------+
|             50.0|
|66.66666666666667|
|             75.0|
+-----------------+



In [48]:
df.select(df.col1 + df.col2).show()
df.select(df.col1 - df.col2).show()
df.select(df.col1 * df.col2).show()
df.select(df.col1 / df.col2).show()

+-------------+
|(col1 + col2)|
+-------------+
|          102|
|          203|
|          304|
+-------------+

+-------------+
|(col1 - col2)|
+-------------+
|           98|
|          197|
|          296|
+-------------+

+-------------+
|(col1 * col2)|
+-------------+
|          200|
|          600|
|         1200|
+-------------+

+-----------------+
|    (col1 / col2)|
+-----------------+
|             50.0|
|66.66666666666667|
|             75.0|
+-----------------+



In [49]:
df.select(df.col1 > df.col2).show()
df.select(df.col2<df.col1).show()
df.select(df.col1 == df.col2).show()

+-------------+
|(col1 > col2)|
+-------------+
|         true|
|         true|
|         true|
+-------------+

+-------------+
|(col2 < col1)|
+-------------+
|         true|
|         true|
|         true|
+-------------+

+-------------+
|(col1 = col2)|
+-------------+
|        false|
|        false|
|        false|
+-------------+



In [50]:
df.select(df.col1 % df.col2).show()

+-------------+
|(col1 % col2)|
+-------------+
|            0|
|            2|
|            0|
+-------------+



In [51]:
from pandas.core.groupby import DataFrameGroupBy
data = [('James','Bond','100',None),
        ('Ann','Varsa','200','F'),
        ('Tom Cruise','XXX','400',''),
        ('Tom Brand',None,'400','M')]
columns = ['Fname','Lname','ID','Gender']
df = spark.createDataFrame(data,columns)
df.show()

+----------+-----+---+------+
|     Fname|Lname| ID|Gender|
+----------+-----+---+------+
|     James| Bond|100|  null|
|       Ann|Varsa|200|     F|
|Tom Cruise|  XXX|400|      |
| Tom Brand| null|400|     M|
+----------+-----+---+------+



In [52]:
#alias function to rename column names
df.select(df.Fname.alias('first_name'),\
          df.Lname.alias('last_name')).show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|     James|     Bond|
|       Ann|    Varsa|
|Tom Cruise|      XXX|
| Tom Brand|     null|
+----------+---------+



In [53]:
#Another example(but here we are merging two columns and then merging them)
from pyspark.sql.functions import expr
df.select(expr("Fname||','||Lname").alias('fullname')).show()

+--------------+
|      fullname|
+--------------+
|    James,Bond|
|     Ann,Varsa|
|Tom Cruise,XXX|
|          null|
+--------------+



In [54]:
#asc and desc to sort in ascending and descending order
df.sort(df.Fname.asc()).show() 
df.sort(df.Fname.desc()).show()

+----------+-----+---+------+
|     Fname|Lname| ID|Gender|
+----------+-----+---+------+
|       Ann|Varsa|200|     F|
|     James| Bond|100|  null|
| Tom Brand| null|400|     M|
|Tom Cruise|  XXX|400|      |
+----------+-----+---+------+

+----------+-----+---+------+
|     Fname|Lname| ID|Gender|
+----------+-----+---+------+
|Tom Cruise|  XXX|400|      |
| Tom Brand| null|400|     M|
|     James| Bond|100|  null|
|       Ann|Varsa|200|     F|
+----------+-----+---+------+



In [55]:
 #cast() & astype() to convert the data Type.
 df.select(df.ID.cast('int')).printSchema()

df.select(df.Fname,df.ID.cast("int")).printSchema()
df.select(df.Fname,df.ID.astype("int")).printSchema()

root
 |-- ID: integer (nullable = true)

root
 |-- Fname: string (nullable = true)
 |-- ID: integer (nullable = true)

root
 |-- Fname: string (nullable = true)
 |-- ID: integer (nullable = true)



In [56]:
#between() to Return a Boolean expression when a column values in between lower and upper bound.
df.filter(df.ID.between(100,300)).show()

+-----+-----+---+------+
|Fname|Lname| ID|Gender|
+-----+-----+---+------+
|James| Bond|100|  null|
|  Ann|Varsa|200|     F|
+-----+-----+---+------+



In [57]:
#contains() to Check if a DataFrame column value contains a a value specified in this function.
df.filter(df.Fname.contains('Tom Cruise')).show()

+----------+-----+---+------+
|     Fname|Lname| ID|Gender|
+----------+-----+---+------+
|Tom Cruise|  XXX|400|      |
+----------+-----+---+------+



In [58]:
# startswith() & endswith()  to check if the value of the DataFrame Column starts and ends with a String respectively.
df.filter(df.Fname.startswith('T')).show()
df.filter(df.Fname.endswith('uise')).show()

+----------+-----+---+------+
|     Fname|Lname| ID|Gender|
+----------+-----+---+------+
|Tom Cruise|  XXX|400|      |
| Tom Brand| null|400|     M|
+----------+-----+---+------+

+----------+-----+---+------+
|     Fname|Lname| ID|Gender|
+----------+-----+---+------+
|Tom Cruise|  XXX|400|      |
+----------+-----+---+------+



In [59]:
#isNull & isNotNull() to check if the DataFrame column has NULL or non NULL values.
df.filter(df.Lname.isNull()).show()
df.filter(df.Lname.isNotNull()).show()

+---------+-----+---+------+
|    Fname|Lname| ID|Gender|
+---------+-----+---+------+
|Tom Brand| null|400|     M|
+---------+-----+---+------+

+----------+-----+---+------+
|     Fname|Lname| ID|Gender|
+----------+-----+---+------+
|     James| Bond|100|  null|
|       Ann|Varsa|200|     F|
|Tom Cruise|  XXX|400|      |
+----------+-----+---+------+



In [60]:
#like() & rlike() – Similar to SQL LIKE expression
#like , rlike

df.select(df.Fname,df.Lname,df.ID).filter(df.Fname.like('%nn')).show()

+-----+-----+---+
|Fname|Lname| ID|
+-----+-----+---+
|  Ann|Varsa|200|
+-----+-----+---+



In [61]:
#substr() – Returns a Column after getting sub string from the Column
df.select(df.Fname.substr(1,2).alias("substr")).show()

+------+
|substr|
+------+
|    Ja|
|    An|
|    To|
|    To|
+------+



In [62]:
#when & otherwise
from pyspark.sql.functions import when
df.select(df.Fname,df.Lname,when(df.Gender=='M','Male')\
          .when(df.Gender=='F','Female')\
          .when(df.Gender==None,'')\
          .otherwise(df.Gender).alias('New_gender')).show()

+----------+-----+----------+
|     Fname|Lname|New_gender|
+----------+-----+----------+
|     James| Bond|      null|
|       Ann|Varsa|    Female|
|Tom Cruise|  XXX|          |
| Tom Brand| null|      Male|
+----------+-----+----------+



In [63]:
#isin
list1 = ['100','200']
df.select(df.Fname,df.Lname,df.ID).filter(df.ID.isin(list1)).show()

+-----+-----+---+
|Fname|Lname| ID|
+-----+-----+---+
|James| Bond|100|
|  Ann|Varsa|200|
+-----+-----+---+



In [64]:

#Create DataFrame with struct, array & map
from pyspark.sql.types import StructType,StructField,StringType,ArrayType,MapType
data=[(("James","Bond"),["Java","C#"],{'hair':'black','eye':'brown'}),
      (("Ann","Varsa"),[".NET","Python"],{'hair':'brown','eye':'black'}),
      (("Tom Cruise",""),["Python","Scala"],{'hair':'red','eye':'grey'}),
      (("Tom Brand",None),["Perl","Ruby"],{'hair':'black','eye':'blue'})]

schema = StructType([
        StructField('name', StructType([
            StructField('fname', StringType(), True),
            StructField('lname', StringType(), True)])),
        StructField('languages', ArrayType(StringType()),True),
        StructField('properties', MapType(StringType(),StringType()),True)
     ])
df=spark.createDataFrame(data,schema)
df.printSchema()
df.show()


root
 |-- name: struct (nullable = true)
 |    |-- fname: string (nullable = true)
 |    |-- lname: string (nullable = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+-----------------+---------------+--------------------+
|             name|      languages|          properties|
+-----------------+---------------+--------------------+
|    {James, Bond}|     [Java, C#]|{eye -> brown, ha...|
|     {Ann, Varsa}| [.NET, Python]|{eye -> black, ha...|
|   {Tom Cruise, }|[Python, Scala]|{eye -> grey, hai...|
|{Tom Brand, null}|   [Perl, Ruby]|{eye -> blue, hai...|
+-----------------+---------------+--------------------+



In [65]:
#getField from MapType
df.select(df.properties.getField("hair")).show()

#getField from Struct
df.select(df.name.getField("fname")).show()

+----------------+
|properties[hair]|
+----------------+
|           black|
|           brown|
|             red|
|           black|
+----------------+

+----------+
|name.fname|
+----------+
|     James|
|       Ann|
|Tom Cruise|
| Tom Brand|
+----------+



In [66]:

#getItem() used with ArrayType
df.select(df.languages.getItem(1)).show()

#getItem() used with MapType
df.select(df.properties.getItem("hair")).show()


+------------+
|languages[1]|
+------------+
|          C#|
|      Python|
|       Scala|
|        Ruby|
+------------+

+----------------+
|properties[hair]|
+----------------+
|           black|
|           brown|
|             red|
|           black|
+----------------+

