In [1]:
import findspark
findspark.init()

In [2]:

from pyspark.sql import SparkSession,types

spark = SparkSession.builder.master('local').appName('qtn1').getOrCreate()

sc=spark.sparkContext

In [3]:
df=spark.read.text('C:/Users/Shivaraj/pyspark work/book1.csv')


In [4]:
df.show(truncate=0)

+----------------------+
|value                 |
+----------------------+
|NAME|AGE|DEP          |
|Vivek|Chaudhary|32|BSC|
|John|Morgan|30|BE     |
|Ashwin|Rao|30|BE      |
+----------------------+



In [5]:
header=df.first()[0]
header

'NAME|AGE|DEP'

In [6]:
schema=['fname','lname','age','dep']
print(schema)

['fname', 'lname', 'age', 'dep']


# filter the header, separate the columns and apply the schema

In [7]:


df_new=df.filter(df['value']!=header).rdd.map(lambda x:x[0].split('|')).toDF(schema)
df_new.show()

+------+---------+---+---+
| fname|    lname|age|dep|
+------+---------+---+---+
| Vivek|Chaudhary| 32|BSC|
|  John|   Morgan| 30| BE|
|Ashwin|      Rao| 30| BE|
+------+---------+---+---+



# concat the columns “fname” and “lname” :

In [8]:

from pyspark.sql.functions import *
df1=df_new.withColumn('Name',concat(col('fname'),lit(' '),col('lname')))
df1.show()

+------+---------+---+---+---------------+
| fname|    lname|age|dep|           Name|
+------+---------+---+---+---------------+
| Vivek|Chaudhary| 32|BSC|Vivek Chaudhary|
|  John|   Morgan| 30| BE|    John Morgan|
|Ashwin|      Rao| 30| BE|     Ashwin Rao|
+------+---------+---+---+---------------+



# attaching 2 columns one below the other

In [9]:
a=df1.select('fname')

b=df1.select('lname')

c=a.union(b).show()

+---------+
|    fname|
+---------+
|    Vivek|
|     John|
|   Ashwin|
|Chaudhary|
|   Morgan|
|      Rao|
+---------+



In [10]:
df1.drop('fname','lname').select('name','age','dep').show()

+---------------+---+---+
|           name|age|dep|
+---------------+---+---+
|Vivek Chaudhary| 32|BSC|
|    John Morgan| 30| BE|
|     Ashwin Rao| 30| BE|
+---------------+---+---+



# marge two columns

In [11]:

df2=df_new.select(concat(df_new.fname,lit(" "),df_new.lname).alias("Fullnames"),'age','dep')

In [12]:
df2.show()

+---------------+---+---+
|      Fullnames|age|dep|
+---------------+---+---+
|Vivek Chaudhary| 32|BSC|
|    John Morgan| 30| BE|
|     Ashwin Rao| 30| BE|
+---------------+---+---+



# split column 

In [13]:
df3=df2.withColumn("Fristname",split(col("Fullnames")," ").getItem(0)).withColumn("lastname",split(col("Fullnames")," ").getItem(1)).drop("Fullnames")
df3.show()

+---+---+---------+---------+
|age|dep|Fristname| lastname|
+---+---+---------+---------+
| 32|BSC|    Vivek|Chaudhary|
| 30| BE|     John|   Morgan|
| 30| BE|   Ashwin|      Rao|
+---+---+---------+---------+



In [14]:
df5=df3.select(concat(df3.Fristname,lit(" "),df3.lastname).alias("fullname"),"age","dep")
df5.show()

+---------------+---+---+
|       fullname|age|dep|
+---------------+---+---+
|Vivek Chaudhary| 32|BSC|
|    John Morgan| 30| BE|
|     Ashwin Rao| 30| BE|
+---------------+---+---+



# selcet multiple columns

In [15]:
df5.select(['age','dep']).show()

+---+---+
|age|dep|
+---+---+
| 32|BSC|
| 30| BE|
| 30| BE|
+---+---+



# Renaming colums

In [16]:
df6=df5.withColumnRenamed('dep','Stream')
df6.show()

+---------------+---+------+
|       fullname|age|Stream|
+---------------+---+------+
|Vivek Chaudhary| 32|   BSC|
|    John Morgan| 30|    BE|
|     Ashwin Rao| 30|    BE|
+---------------+---+------+



# coalesce key

In [17]:
qf=spark.read.csv("D:/Shivaraj/learning/dataset-master/dataset-master/mob_num.csv",header=True,inferSchema=True)
qf.show()

+-------+---------------+-----------+---------+
|   Name|Personal_Mobile|Home_Mobile|office_no|
+-------+---------------+-----------+---------+
|   Adhi|     9876326754| 9358416345|     null|
|  Arjun|           null|       null|442644325|
|Prathap|     9876544564|       null|     null|
|  Manju|           null|       null|     null|
| Ariyan|           null| 8761524533|     null|
+-------+---------------+-----------+---------+



In [18]:
# collect all people mobile number if available, if not mention not available

output = qf.withColumn("new_Number",coalesce("Personal_Mobile","Home_Mobile","office_no",lit("not_available")))
output.show()

+-------+---------------+-----------+---------+-------------+
|   Name|Personal_Mobile|Home_Mobile|office_no|   new_Number|
+-------+---------------+-----------+---------+-------------+
|   Adhi|     9876326754| 9358416345|     null|   9876326754|
|  Arjun|           null|       null|442644325|    442644325|
|Prathap|     9876544564|       null|     null|   9876544564|
|  Manju|           null|       null|     null|not_available|
| Ariyan|           null| 8761524533|     null|   8761524533|
+-------+---------------+-----------+---------+-------------+



In [19]:
output.select("Name","new_Number").show()

+-------+-------------+
|   Name|   new_Number|
+-------+-------------+
|   Adhi|   9876326754|
|  Arjun|    442644325|
|Prathap|   9876544564|
|  Manju|not_available|
| Ariyan|   8761524533|
+-------+-------------+



In [21]:
output.rdd.getNumPartitions()

1

In [27]:
output.select("Name").distinct().show()

+-------+
|   Name|
+-------+
|  Manju|
| Ariyan|
|  Arjun|
|Prathap|
|   Adhi|
+-------+



In [28]:
output.count()

5

In [41]:
output.rdd.repartition(3).getNumPartitions()

3