In [0]:
spark

In [0]:
df1 = spark.read.format("json").load("dbfs:/FileStore/shared_uploads/abhishekangne7@gmail.com/2015_summary.json")

In [0]:
df1.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [0]:
df1.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [0]:
from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType

myManualSchema = StructType
([
    StructField("DEST_COUNTRY_NAME",StringType(), True),
    StructField("ORIGIN_COUNTRY_NAME",StringType(), True),
    StructField("count",IntegerType(), False, metadata = {"name":"Abhishek"}),
])

Out[18]: [StructField('DEST_COUNTRY_NAME', StringType(), True),
 StructField('ORIGIN_COUNTRY_NAME', StringType(), True),
 StructField('count', IntegerType(), False)]

In [0]:
df1.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType

In [0]:
# Initialize Spark session
spark = SparkSession.builder.appName("AA").getOrCreate()

In [0]:
myManualSchema = StructType
([
    StructField("DEST_COUNTRY_NAME",StringType(), True),
    StructField("ORIGIN_COUNTRY_NAME",StringType(), True),
    StructField("count",LongType(), False, metadata = {"name":"Abhishek"}),
])

Out[34]: [StructField('DEST_COUNTRY_NAME', StringType(), True),
 StructField('ORIGIN_COUNTRY_NAME', StringType(), True),
 StructField('count', LongType(), False)]

In [0]:
print(type(myManualSchema))

<class 'type'>


In [0]:
print(type(myManualSchema))

<class 'type'>


In [0]:
schema_ddl = "DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count INT"


In [0]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("AA").getOrCreate()

# Load the JSON file with the defined schema
df1 = spark.read.format("json").schema(schema_ddl).load("dbfs:/FileStore/shared_uploads/abhishekangne7@gmail.com/2015_summary.json")

# Show the DataFrame
df1.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [0]:
df1.count()

Out[40]: 256

In [0]:
from pyspark.sql.functions import col, column, expr

In [0]:
expr("(((count+5)*200)-6) < count*1")

Out[45]: Column<'((((count + 5) * 200) - 6) < (count * 1))'>

In [0]:
df1.columns

Out[46]: ['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

In [0]:
from pyspark.sql import Row
myRow = Row("Hellooo", None, 1, False)

In [0]:
myRow[0]


Out[51]: 'Hellooo'

In [0]:
myRow[2]

Out[52]: 1

In [0]:
df1.createOrReplaceTempView("df1Table")

In [0]:
df1.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows



In [0]:
from pyspark.sql.functions import expr, col, column
df1.select(
		  expr("DEST_COUNTRY_NAME"),
		  col("DEST_COUNTRY_NAME"),
		  column("DEST_COUNTRY_NAME")).show(2)

+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|
|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+
only showing top 2 rows



In [0]:
df1.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows



In [0]:
#or you can use alias on top the expr 
df1.select(expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME")).show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [0]:
df1.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)

+-------------+-----------------+
|newColumnName|DEST_COUNTRY_NAME|
+-------------+-----------------+
|United States|    United States|
|United States|    United States|
+-------------+-----------------+
only showing top 2 rows



In [0]:
df1.selectExpr(
    "*",
    "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) AS withinCountry")\
    .show(4)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
|    United States|            Ireland|  344|        false|
|            Egypt|      United States|   15|        false|
+-----------------+-------------------+-----+-------------+
only showing top 4 rows



In [0]:
from pyspark.sql.functions import lit
df1.select(expr("*"), lit(-999).alias("negative_999")).show(20)

+--------------------+-------------------+-----+------------+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|negative_999|
+--------------------+-------------------+-----+------------+
|       United States|            Romania|   15|        -999|
|       United States|            Croatia|    1|        -999|
|       United States|            Ireland|  344|        -999|
|               Egypt|      United States|   15|        -999|
|       United States|              India|   62|        -999|
|       United States|          Singapore|    1|        -999|
|       United States|            Grenada|   62|        -999|
|          Costa Rica|      United States|  588|        -999|
|             Senegal|      United States|   40|        -999|
|             Moldova|      United States|    1|        -999|
|       United States|       Sint Maarten|  325|        -999|
|       United States|   Marshall Islands|   39|        -999|
|              Guyana|      United States|   64|        -999|
|       

In [0]:
from pyspark.sql.functions import column
df1.withColumn("numberSeven", lit(7)).show(3)

+-----------------+-------------------+-----+-----------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberSeven|
+-----------------+-------------------+-----+-----------+
|    United States|            Romania|   15|          7|
|    United States|            Croatia|    1|          7|
|    United States|            Ireland|  344|          7|
+-----------------+-------------------+-----+-----------+
only showing top 3 rows



In [0]:
df1.withColumnRenamed("DEST_COUNTRY_NAME","DEST").columns

Out[79]: ['DEST', 'ORIGIN_COUNTRY_NAME', 'count']

In [0]:
df1.drop("numberSeven").columns

Out[81]: ['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

In [0]:
df1.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [0]:
df1.withColumn("count2",col("count").cast("long"))

Out[90]: DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: int, count2: bigint]

In [0]:
df1.show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [0]:
df1.withColumn("count2",col("count")+2).show(3)

+-----------------+-------------------+-----+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|count2|
+-----------------+-------------------+-----+------+
|    United States|            Romania|   15|    17|
|    United States|            Croatia|    1|     3|
|    United States|            Ireland|  344|   346|
+-----------------+-------------------+-----+------+
only showing top 3 rows



In [0]:
df1.filter(col("count") < 2).show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Croatia|    1|
|       United States|          Singapore|    1|
|             Moldova|      United States|    1|
|               Malta|      United States|    1|
|       United States|          Gibraltar|    1|
|Saint Vincent and...|      United States|    1|
|            Suriname|      United States|    1|
|       United States|             Cyprus|    1|
|        Burkina Faso|      United States|    1|
|            Djibouti|      United States|    1|
|       United States|            Estonia|    1|
|              Zambia|      United States|    1|
|              Cyprus|      United States|    1|
|       United States|          Lithuania|    1|
|       United States|           Bulgaria|    1|
|       United States|            Georgia|    1|
|       United States|            Bahrain|    1|
|       Cote d'Ivoir

In [0]:
df1.where(col("count") < 3).show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Croatia|    1|
|       United States|          Singapore|    1|
|             Moldova|      United States|    1|
|               Malta|      United States|    1|
|       United States|          Gibraltar|    1|
|Saint Vincent and...|      United States|    1|
|            Suriname|      United States|    1|
|       United States|             Cyprus|    1|
|             Liberia|      United States|    2|
|             Hungary|      United States|    2|
|       United States|            Vietnam|    2|
|        Burkina Faso|      United States|    1|
|            Djibouti|      United States|    1|
|       United States|            Estonia|    1|
|              Zambia|      United States|    1|
|            Malaysia|      United States|    2|
|             Croatia|      United States|    2|
|              Cypru

In [0]:
df1.rdd.getNumPartitions()

Out[100]: 1

In [0]:
df1.repartition(5)

Out[101]: DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: int]

In [0]:
df1.rdd.getNumPartitions()

Out[102]: 1

In [0]:
spark

In [0]:
df1.repartition(5,col("ORIGIN_COUNTRY_NAME")).coalesce(3)

Out[104]: DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: int]

In [0]:
df1.collect()

Out[105]: [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenada', count=62),
 Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588),
 Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=40),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Sint Maarten', count=325),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Marshall Islands', co

In [0]:
df1.take(4)

Out[106]: [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15)]

In [0]:
df1.where("count > 2").limit(5).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|            Grenada|   62|
+-----------------+-------------------+-----+



In [0]:
# Create a temporary view
df1.createOrReplaceTempView("df1_view")

# Run a SQL query on the temporary view
result_df = spark.sql("SELECT * FROM df1_view WHERE count > 2 ORDER BY count DESC")
result_df.limit(5).show()

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
|           Canada|      United States|  8399|
|    United States|             Mexico|  7187|
|           Mexico|      United States|  7140|
+-----------------+-------------------+------+

