<a href="https://colab.research.google.com/github/dipteshnath/Python/blob/master/Spark5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.3.2/spark-2.3.2-bin-hadoop2.7.tgz
!tar xf spark-2.3.2-bin-hadoop2.7.tgz
!pip install -q findspark


In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.2-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
df = spark.createDataFrame([{"hello": "world"} for x in range(1000)])
df.show(3)



+-----+
|hello|
+-----+
|world|
|world|
|world|
+-----+
only showing top 3 rows



In [0]:
from google.colab import files
files.upload()


Saving 2015-summary.json to 2015-summary.json


{'2015-summary.json': b'{"ORIGIN_COUNTRY_NAME":"Romania","DEST_COUNTRY_NAME":"United States","count":15}\n{"ORIGIN_COUNTRY_NAME":"Croatia","DEST_COUNTRY_NAME":"United States","count":1}\n{"ORIGIN_COUNTRY_NAME":"Ireland","DEST_COUNTRY_NAME":"United States","count":344}\n{"ORIGIN_COUNTRY_NAME":"United States","DEST_COUNTRY_NAME":"Egypt","count":15}\n{"ORIGIN_COUNTRY_NAME":"India","DEST_COUNTRY_NAME":"United States","count":62}\n{"ORIGIN_COUNTRY_NAME":"Singapore","DEST_COUNTRY_NAME":"United States","count":1}\n{"ORIGIN_COUNTRY_NAME":"Grenada","DEST_COUNTRY_NAME":"United States","count":62}\n{"ORIGIN_COUNTRY_NAME":"United States","DEST_COUNTRY_NAME":"Costa Rica","count":588}\n{"ORIGIN_COUNTRY_NAME":"United States","DEST_COUNTRY_NAME":"Senegal","count":40}\n{"ORIGIN_COUNTRY_NAME":"United States","DEST_COUNTRY_NAME":"Moldova","count":1}\n{"ORIGIN_COUNTRY_NAME":"Sint Maarten","DEST_COUNTRY_NAME":"United States","count":325}\n{"ORIGIN_COUNTRY_NAME":"Marshall Islands","DEST_COUNTRY_NAME":"Unite

In [0]:
spark.read.format("json").load("2015-summary.json").schema

StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true)))

In [0]:
from pyspark.sql.types import StructField,StructType,StringType,LongType
myManualSchema=StructType([StructField("DEST_COUNTRY_NAME", StringType(),True),
                          StructField("ORIGIN_COUNTRY_NAME",StringType(),True),
                          StructField("count", LongType(),False,metadata={"hello":"world"})])
df=spark.read.format("json").schema(myManualSchema).load("2015-summary.json")

In [0]:
df.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import col,column
df.select(col("count")).show(5)

+-----+
|count|
+-----+
|   15|
|    1|
|  344|
|   15|
|   62|
+-----+
only showing top 5 rows



In [0]:
df.first()

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

In [0]:
from pyspark.sql import Row
myRow=Row("Hello",None,1,False)

In [0]:
myRow[0]

'Hello'

In [0]:
from pyspark.sql import Row
from pyspark.sql.types import StructField,StructType,StringType,LongType
myManualSchema=StructType([StructField("some",StringType(),True),
                          StructField("col",StringType(),True),
                          StructField("names",LongType(),False)
                          ])
myRow=Row("Hello",None,1)
myDf=spark.createDataFrame([myRow],myManualSchema)
myDf.show()

+-----+----+-----+
| some| col|names|
+-----+----+-----+
|Hello|null|    1|
+-----+----+-----+



In [0]:
df.select("DEST_COUNTRY_NAME").show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [0]:
df.select("DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME").show(2)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows



In [0]:
from pyspark.sql.functions import expr,col,column
df.select(expr("DEST_COUNTRY_NAME"),col("DEST_COUNTRY_NAME"),column("DEST_COUNTRY_NAME")).show(2)

+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|
|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+
only showing top 2 rows



In [0]:
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows



In [0]:
df.select(expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME")).show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [0]:
df.selectExpr("DEST_COUNTRY_NAME as newColumnName","DEST_COUNTRY_NAME").show()

+--------------------+--------------------+
|       newColumnName|   DEST_COUNTRY_NAME|
+--------------------+--------------------+
|       United States|       United States|
|       United States|       United States|
|       United States|       United States|
|               Egypt|               Egypt|
|       United States|       United States|
|       United States|       United States|
|       United States|       United States|
|          Costa Rica|          Costa Rica|
|             Senegal|             Senegal|
|             Moldova|             Moldova|
|       United States|       United States|
|       United States|       United States|
|              Guyana|              Guyana|
|               Malta|               Malta|
|            Anguilla|            Anguilla|
|             Bolivia|             Bolivia|
|       United States|       United States|
|             Algeria|             Algeria|
|Turks and Caicos ...|Turks and Caicos ...|
|       United States|       Uni

In [0]:
df.selectExpr("*","(DEST_COUNTRY_NAME=ORIGIN_COUNTRY_NAME) as withinCountry").show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



In [0]:
df.selectExpr("avg(count)","count(distinct(DEST_COUNTRY_NAME))").show(2)

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



In [0]:
from pyspark.sql.functions import lit
df.select(expr("*"),lit(1).alias("One")).show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



In [0]:
df.withColumn("numberOne",lit(1)).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [0]:
df.withColumn("withinCountry",expr("ORIGIN_COUNTRY_NAME==DEST_COUNTRY_NAME")).show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



In [0]:
df.withColumn("Destination",expr("DEST_COUNTRY_NAME")).columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count', 'Destination']

In [0]:
df.withColumnRenamed("DEST_COUNTRY_NAME","dest").columns

['dest', 'ORIGIN_COUNTRY_NAME', 'count']

In [0]:
dfWithLongCOlumnName=df.withColumn("This Long Column-Name",expr("ORIGIN_COUNTRY_NAME"))

In [0]:
dfWithLongCOlumnName.show(2)

+-----------------+-------------------+-----+---------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|This Long Column-Name|
+-----------------+-------------------+-----+---------------------+
|    United States|            Romania|   15|              Romania|
|    United States|            Croatia|    1|              Croatia|
+-----------------+-------------------+-----+---------------------+
only showing top 2 rows



In [0]:
dfWithLongCOlumnName.selectExpr(
    "`This Long Column-Name`",
    "`This Long Column-Name` as `new col`")\
  .show(2)

+---------------------+-------+
|This Long Column-Name|new col|
+---------------------+-------+
|              Romania|Romania|
|              Croatia|Croatia|
+---------------------+-------+
only showing top 2 rows



In [0]:
dfWithLongCOlumnName.select(expr("`This Long Column-Name")).columns

ParseException: ignored

In [0]:
dfWithLongCOlumnName.drop("ORIGIN_COUNTRY_NAME","DEST_COUNTRY_NAME")

DataFrame[count: bigint, This Long Column-Name: string]

In [0]:
dfWithLongCOlumnName.show(2)

+-----------------+-------------------+-----+---------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|This Long Column-Name|
+-----------------+-------------------+-----+---------------------+
|    United States|            Romania|   15|              Romania|
|    United States|            Croatia|    1|              Croatia|
+-----------------+-------------------+-----+---------------------+
only showing top 2 rows



In [0]:
df.withColumn("count2",col("count").cast("long"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint, count2: bigint]

In [0]:
df.filter(col("count")<2).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [0]:
df.where("count<2").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [0]:
df.where(col("count")<2).where(col("ORIGIN_COUNTRY_NAME")!="Croatia").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [0]:
df.select("ORIGIN_COUNTRY_NAME","DEST_COUNTRY_NAME").distinct().count()

256

In [0]:
df.select("ORIGIN_COUNTRY_NAME").distinct().count()

125

In [0]:
seed=5
withReplacement=False
fraction=0.5
df.sample(withReplacement,fraction,seed).count()

126

In [0]:
dataFrame=df.randomSplit([0.25,0.75],seed)
dataFrame[0].count()>dataFrame[1].count()

False

In [0]:
dataFrames=df.randomSplit([0.25,0.75],seed)
dataFrames[0].count()>dataFrames[1].count()

False

In [0]:
from pyspark.sql import Row
schema=df.schema
newRows=[
    Row("New Country","Other Country",5L),
    Row("New Country 2","Other Country 3", 1L)
]
parallelizedRows=spark.sparkContext.parallelize(newRows)
newDF=spark.createDataFrame(parallelizedRows,schema)
df.union(newDF).where("count=1").where(col("ORIGIN_COUNTRY_NAME")!="United States").show()

SyntaxError: ignored

In [0]:
df.sort("count").show(5)

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows



In [0]:
df.orderBy("count","DEST_COUNTRY_NAME").show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
|           Cyprus|      United States|    1|
|         Djibouti|      United States|    1|
|        Indonesia|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
df.orderBy(col("count"),col("DEST_COUNTRY_NAME")).show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
|           Cyprus|      United States|    1|
|         Djibouti|      United States|    1|
|        Indonesia|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import desc,asc
df.orderBy(expr("count desc")).show(5)
df.orderBy(col("count").desc(),col("DEST_COUNTRY_NAME").asc()).show(5)

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
|           Canada|      United States|  8399|
|    United States|             Mexico|  7187|
|           Mexico|      United States|  7140|
+-----------------+-------------------+------+
only showing top 5 rows



In [0]:
#spark.read.format("json").load("data/path".sortWithinPartitions("count"))

In [0]:
df.limit(5).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+



In [0]:
df.orderBy(expr("count desc")).limit(6).show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
|             Moldova|      United States|    1|
+--------------------+-------------------+-----+



In [0]:
df.rdd.getNumPartitions()

1

In [0]:
df.repartition(col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [0]:
df.rdd.getNumPartitions()

1

In [0]:
df.repartition(5,col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [0]:
df.rdd.getNumPartitions()

1

In [0]:
df.repartition(5,col("DEST_COUNTRY_NAME")).coalesce(2)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [0]:
collectDF=df.limit(10)
collectDF.take(5)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62)]

In [0]:
collectDF.show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+



In [0]:
collectDF.show(5,False)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |15   |
|United States    |Croatia            |1    |
|United States    |Ireland            |344  |
|Egypt            |United States      |15   |
|United States    |India              |62   |
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
collectDF.collect()

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenada', count=62),
 Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588),
 Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=40),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [0]:
for i in collectDF.toLocalIterator():print(i)

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1)
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)
Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15)
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62)
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1)
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenada', count=62)
Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588)
Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=40)
Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)


In [0]:
seed=5
dataFrames1 = df.randomSplit([0.25, 0.75], seed)
dataFrames1[0].count() > dataFrames1[1].count()

False

In [0]:
dataFrames1[0].show(5)
dataFrames1[1].show(5)


+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|             Algeria|      United States|    4|
|             Austria|      United States|   62|
|          Azerbaijan|      United States|   21|
|              Belize|      United States|  188|
|British Virgin Is...|      United States|  107|
+--------------------+-------------------+-----+
only showing top 5 rows

+-------------------+-------------------+-----+
|  DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-------------------+-------------------+-----+
|             Angola|      United States|   15|
|           Anguilla|      United States|   41|
|Antigua and Barbuda|      United States|  126|
|          Argentina|      United States|  180|
|              Aruba|      United States|  346|
+-------------------+-------------------+-----+
only showing top 5 rows

