In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 42 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 38.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=b43f2be270f1fd51f4ee452fc43321f86ebb3b176af0e6f3e06e01f2687b0121
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [2]:
from pyspark.sql.functions import col,struct,when
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, MapType
from pyspark.sql import Row

**Create a session**

In [3]:
spark = SparkSession.builder.master("local[*]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

**Create a column simple**

In [4]:
from pyspark.sql.functions import lit
colObj = lit("sparkbyexamples.com")
print(colObj)

Column<'sparkbyexamples.com'>


**Create column from DF with multiple ways**

In [5]:
data=[("James",23),("Ann",40)]
df = spark.createDataFrame(data).toDF('name.fname','age')
df.printSchema()
df.show()

root
 |-- name.fname: string (nullable = true)
 |-- age: long (nullable = true)

+----------+---+
|name.fname|age|
+----------+---+
|     James| 23|
|       Ann| 40|
+----------+---+



In [6]:
df.select(df.age).show()
df.select(df["`name.fname`"]).show()
df.select(df['age']).show()

+---+
|age|
+---+
| 23|
| 40|
+---+

+----------+
|name.fname|
+----------+
|     James|
|       Ann|
+----------+

+---+
|age|
+---+
| 23|
| 40|
+---+



In [7]:
from pyspark.sql.functions import col
df.select(col('age')).show()
df.select(col("`name.fname`")).show()

+---+
|age|
+---+
| 23|
| 40|
+---+

+----------+
|name.fname|
+----------+
|     James|
|       Ann|
+----------+



**Using Row class on PySpark RDD**

In [8]:
from pyspark.sql import Row
data=[Row(name="James",prop=Row(hair="black",eye="blue")),
      Row(name="Ann",prop=Row(hair="grey",eye="black"))]
df1=spark.createDataFrame(data)
df1.printSchema()

root
 |-- name: string (nullable = true)
 |-- prop: struct (nullable = true)
 |    |-- hair: string (nullable = true)
 |    |-- eye: string (nullable = true)



In [9]:
df1.select(df1.prop.hair).show()
df1.select(df1['prop.hair']).show()
df1.select(col('prop.hair')).show()
df1.select(col('prop.*')).show()

+---------+
|prop.hair|
+---------+
|    black|
|     grey|
+---------+

+-----+
| hair|
+-----+
|black|
| grey|
+-----+

+-----+
| hair|
+-----+
|black|
| grey|
+-----+

+-----+-----+
| hair|  eye|
+-----+-----+
|black| blue|
| grey|black|
+-----+-----+



**Pyspark Column functions**



In [11]:
data=[("James","Bond","100",None),
      ("Ann","Varsa","200",'F'),
      ("Tom Cruise","XXX","400",''),
      ("Tom Brand",None,"400",'M')] 
columns=["fname","lname","id","gender"]
df2 = spark.createDataFrame(data, columns)
df2.printSchema()

root
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)



- **Alias()**

In [25]:
df2.select(df2['fname'].alias('firstname'), df2.lname.alias('lastname')).show()

from pyspark.sql.functions import expr 
df2.select(expr('fname || "," || lname').alias('fullname')).show()

+----------+--------+
| firstname|lastname|
+----------+--------+
|     James|    Bond|
|       Ann|   Varsa|
|Tom Cruise|     XXX|
| Tom Brand|    null|
+----------+--------+

+--------------+
|      fullname|
+--------------+
|    James,Bond|
|     Ann,Varsa|
|Tom Cruise,XXX|
|          null|
+--------------+



- **Sort (Ascending/Descending)**

In [29]:
df2.sort(df2.fname.asc()).show()
df2.sort(df2.lname.desc()).show()

+----------+-----+---+------+
|     fname|lname| id|gender|
+----------+-----+---+------+
|       Ann|Varsa|200|     F|
|     James| Bond|100|  null|
| Tom Brand| null|400|     M|
|Tom Cruise|  XXX|400|      |
+----------+-----+---+------+

+----------+-----+---+------+
|     fname|lname| id|gender|
+----------+-----+---+------+
|Tom Cruise|  XXX|400|      |
|       Ann|Varsa|200|     F|
|     James| Bond|100|  null|
| Tom Brand| null|400|     M|
+----------+-----+---+------+



- **Convert datatype (astype()/cast())**

In [33]:
df2.select(df2.fname, df2.lname, df2.id.cast("int")).printSchema()
df2.select(df2.fname, df2.lname, df2.id.astype("int")).printSchema()

root
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- id: integer (nullable = true)

root
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- id: integer (nullable = true)



- **Filter with Between()**

In [34]:
df2.filter(df2.id.between(100,200)).show()

+-----+-----+---+------+
|fname|lname| id|gender|
+-----+-----+---+------+
|James| Bond|100|  null|
|  Ann|Varsa|200|     F|
+-----+-----+---+------+



- **Filter with Contain()**

In [37]:
df2.filter(df2.fname.contains("Ann")).show()

+-----+-----+---+------+
|fname|lname| id|gender|
+-----+-----+---+------+
|  Ann|Varsa|200|     F|
+-----+-----+---+------+



- **Filter with startswith()/endswith()**

In [39]:
df2.filter(df2.lname.startswith("V")).show()
df2.filter(df2.lname.endswith("a")).show()

+-----+-----+---+------+
|fname|lname| id|gender|
+-----+-----+---+------+
|  Ann|Varsa|200|     F|
+-----+-----+---+------+

+-----+-----+---+------+
|fname|lname| id|gender|
+-----+-----+---+------+
|  Ann|Varsa|200|     F|
+-----+-----+---+------+



- **Filter with isNull()/isNotNull()**

In [42]:
df2.filter(df2.lname.isNull()).show()
df2.filter(df2.lname.isNotNull()).show()

+---------+-----+---+------+
|    fname|lname| id|gender|
+---------+-----+---+------+
|Tom Brand| null|400|     M|
+---------+-----+---+------+

+----------+-----+---+------+
|     fname|lname| id|gender|
+----------+-----+---+------+
|     James| Bond|100|  null|
|       Ann|Varsa|200|     F|
|Tom Cruise|  XXX|400|      |
+----------+-----+---+------+



- **Filter with like()/rlike()**

In [58]:
df2.filter(df2.fname.like("%om%")).show()
df2.select(df2.fname,df2.lname) \
.filter(df2.fname.rlike("es$")).show()

+----------+-----+---+------+
|     fname|lname| id|gender|
+----------+-----+---+------+
|Tom Cruise|  XXX|400|      |
| Tom Brand| null|400|     M|
+----------+-----+---+------+

+-----+-----+
|fname|lname|
+-----+-----+
|James| Bond|
+-----+-----+



- **Filter with when()/otherWise()**

In [64]:
from pyspark.sql.functions import when

df2.select(df2.fname, df2.lname, when(df2.gender == "M", "Male") \
           .when(df2.gender == "F", "Female") \
           .when(df2.gender == "", "null") \
           .otherwise(df2.gender).alias("New_gender")).show()

+----------+-----+----------+
|     fname|lname|New_gender|
+----------+-----+----------+
|     James| Bond|      null|
|       Ann|Varsa|    Female|
|Tom Cruise|  XXX|      null|
| Tom Brand| null|      Male|
+----------+-----+----------+



- **Filter with isin() - check value in a list**

In [65]:
li = ["100", "200"]
df2.filter(df2.id.isin(li)).show()
           

+-----+-----+---+------+
|fname|lname| id|gender|
+-----+-----+---+------+
|James| Bond|100|  null|
|  Ann|Varsa|200|     F|
+-----+-----+---+------+



- **getField()** - To get a value by key from a MapType column and by stuct child name from StructType column

In [67]:
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType

data=[(("James","Bond"),["Java","C#"],{'hair':'black','eye':'brown'}),
      (("Ann","Varsa"),[".NET","Python"],{'hair':'brown','eye':'black'}),
      (("Tom Cruise",""),["Python","Scala"],{'hair':'red','eye':'grey'}),
      (("Tom Brand",None),["Perl","Ruby"],{'hair':'black','eye':'blue'})]


struct = StructType([StructField('name', StructType([
                                                     StructField('fname', StringType(), True),
                                                     StructField('lname', StringType(), True)
                                                    ])),
                     StructField('languages', ArrayType(StringType()), True),
                     StructField('properties', MapType(StringType(), StringType()), True)])

df3 = spark.createDataFrame(data, struct)
df3.printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- fname: string (nullable = true)
 |    |-- lname: string (nullable = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



In [69]:
df3.select(df3.properties.getField("hair")).show()
df3.select(df3.name.getField("fname")).show()

+----------------+
|properties[hair]|
+----------------+
|           black|
|           brown|
|             red|
|           black|
+----------------+

+----------+
|name.fname|
+----------+
|     James|
|       Ann|
|Tom Cruise|
| Tom Brand|
+----------+



In [75]:
df3.select(df3.languages.getItem(1)).show()
df3.select(df3.properties.getItem("hair")).show()

+------------+
|languages[1]|
+------------+
|          C#|
|      Python|
|       Scala|
|        Ruby|
+------------+

+----------------+
|properties[hair]|
+----------------+
|           black|
|           brown|
|             red|
|           black|
+----------------+

