<a href="https://colab.research.google.com/github/DHEERAJ264/Apache-Spark/blob/master/spark_transformations.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install pyspark


Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/87/21/f05c186f4ddb01d15d0ddc36ef4b7e3cedbeb6412274a41f26b55a650ee5/pyspark-2.4.4.tar.gz (215.7MB)
[K     |████████████████████████████████| 215.7MB 103kB/s 
[?25hCollecting py4j==0.10.7 (from pyspark)
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 39.9MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130387 sha256=001f66009882c60a1567c0d183b95516a0fa97d0c3635699a0cb49fb3c83b44b
  Stored in directory: /root/.cache/pip/wheels/ab/09/4d/0d184230058e654eb1b04467dbc1292f00eaa186544604b471
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 py

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *


from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)


In [0]:
df = spark.read.json("/content/persons.json")

In [6]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- height: long (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)



In [7]:
df.show()

+---+------+-------+------+
|age|height|   name|   sex|
+---+------+-------+------+
| 14|   156|  Alice|female|
| 21|   181|    Bob|  male|
| 27|   176|Charlie|  male|
| 24|   167|    Eve|female|
| 19|   172|Frances|female|
| 31|   191| George|  male|
+---+------+-------+------+



In [0]:
persons = spark.read \
          .option('inferSchema',True) \
          .option('header',True) \
           .csv('/content/persons_header.csv')

In [9]:
persons.show()

+---+------+-------+------+
|age|height|   name|   sex|
+---+------+-------+------+
| 23|   156|  Alice|female|
| 21|   181|    Bob|  male|
| 27|   176|Charlie|  male|
| 24|   167|    Eve|female|
| 19|   172|Frances|female|
| 31|   191| George|female|
+---+------+-------+------+



In [10]:
persons.printSchema()


root
 |-- age: integer (nullable = true)
 |-- height: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)



In [11]:
result = persons.select(persons.name,persons.age)
result.show()

+-------+---+
|   name|age|
+-------+---+
|  Alice| 23|
|    Bob| 21|
|Charlie| 27|
|    Eve| 24|
|Frances| 19|
| George| 31|
+-------+---+



In [0]:
result = persons.select(persons.name,upper(persons.name))

In [13]:
result.show()

+-------+-----------+
|   name|upper(name)|
+-------+-----------+
|  Alice|      ALICE|
|    Bob|        BOB|
|Charlie|    CHARLIE|
|    Eve|        EVE|
|Frances|    FRANCES|
| George|     GEORGE|
+-------+-----------+



In [0]:
result_alias = persons.select(persons.name,upper(persons.name)).alias('names_new')

In [15]:
result_alias.show()

+-------+-----------+
|   name|upper(name)|
+-------+-----------+
|  Alice|      ALICE|
|    Bob|        BOB|
|Charlie|    CHARLIE|
|    Eve|        EVE|
|Frances|    FRANCES|
| George|     GEORGE|
+-------+-----------+



In [0]:
result_alias = persons.select(persons.name,(persons.age**2).alias('age_sqr'))

In [17]:
result_alias.show()

+-------+-------+
|   name|age_sqr|
+-------+-------+
|  Alice|  529.0|
|    Bob|  441.0|
|Charlie|  729.0|
|    Eve|  576.0|
|Frances|  361.0|
| George|  961.0|
+-------+-------+



In [18]:
result = persons.select(concat(when(persons.sex == 'male', "Mr ").otherwise("Mrs "), persons.name).alias("salutation"))
result.show()

+-----------+
| salutation|
+-----------+
|  Mrs Alice|
|     Mr Bob|
| Mr Charlie|
|    Mrs Eve|
|Mrs Frances|
| Mrs George|
+-----------+



In [19]:
result = persons.select(concat(lit('Name:'), persons.name, lit(' Age:'), persons.age).alias('text'))
result.show()

+-------------------+
|               text|
+-------------------+
|  Name:Alice Age:23|
|    Name:Bob Age:21|
|Name:Charlie Age:27|
|    Name:Eve Age:24|
|Name:Frances Age:19|
| Name:George Age:31|
+-------------------+



In [20]:
result2 = result.drop("name","age")
result2.show()

+-------------------+
|               text|
+-------------------+
|  Name:Alice Age:23|
|    Name:Bob Age:21|
|Name:Charlie Age:27|
|    Name:Eve Age:24|
|Name:Frances Age:19|
| Name:George Age:31|
+-------------------+



In [21]:
result = persons.filter(persons.age > 22)
result.show()

+---+------+-------+------+
|age|height|   name|   sex|
+---+------+-------+------+
| 23|   156|  Alice|female|
| 27|   176|Charlie|  male|
| 24|   167|    Eve|female|
| 31|   191| George|female|
+---+------+-------+------+



In [22]:
df = spark.createDataFrame([('Bob',),('Alice',),('Bob',)], ['name'])
df.show()

+-----+
| name|
+-----+
|  Bob|
|Alice|
|  Bob|
+-----+



In [23]:
df.distinct()

DataFrame[name: string]

In [24]:
df.show()

+-----+
| name|
+-----+
|  Bob|
|Alice|
|  Bob|
+-----+

