In [0]:
from pyspark.sql.types import StructField,StructType,StringType,LongType

In [0]:
from pyspark.sql.functions import asc,desc,col,count

In [0]:
Input_path="dbfs:/FileStore/users.json"

In [0]:
user_schema=StructType(
  [
    StructField("age",LongType(),True),
    StructField("gender",StringType(),True),
    StructField("name",StringType(),True),
    StructField("phone",StringType(),True),
    StructField("userid",StringType(),True) 
  ]
)

In [0]:
#df=spark.read.json(Input_path)
#df=spark.read.format("json").load(Input_path)
#df=spark.read.format("json").schema(user_schema).load(Input_path)
df=spark.read.load(Input_path,format="json")
df.show()

+----+------+------+----------+------+
| age|gender|  name|     phone|userid|
+----+------+------+----------+------+
|  25|  Male| Satya|8501099876|     1|
|null|  Male| Vivek|5676599876|     2|
|  25|  Male| Rahim|      null|     3|
|  28|Female|Sandra|8508899001|     4|
|  15|Female|Keerti|      null|     5|
|  13|  Male|Sundar|8522233456|     6|
|  28|  Male| Steve|8501085009|     7|
|  55|Female|Smriti|9246655498|     8|
+----+------+------+----------+------+



In [0]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- userid: long (nullable = true)



In [0]:
df1=df.select("age","name")\
  .filter("age is not null")\
    .orderBy("age")\
      .groupBy("age").count()\
        .limit(5)
df1.show()

      
        

+---+-----+
|age|count|
+---+-----+
| 25|    2|
| 28|    2|
| 55|    1|
| 13|    1|
| 15|    1|
+---+-----+



In [0]:
spark.catalog.listTables()

Out[11]: [Table(name='users', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [0]:
df.createOrReplaceTempView("users")

In [0]:
query="""select age,count(*) as count from users
          where age is not null
          group by age
          order by age
          limit 4
          """
df2=spark.sql(query)
df2.show()

+---+-----+
|age|count|
+---+-----+
| 13|    1|
| 15|    1|
| 25|    2|
| 28|    2|
+---+-----+



In [0]:
%sql
select age, count(*) as count from users
where age is not null
group by age
order by age
limit 5

age,count
13,1
15,1
25,2
28,2
55,1


In [0]:
output_path="/FileStore/outPut_json"

In [0]:
df2.write.format("json").mode("ignore").save(output_path)

In [0]:
df=spark.read.json("/FileStore/outPut_json")
df.show()

+---+-----+
|age|count|
+---+-----+
| 55|    1|
| 28|    2|
| 25|    2|
| 15|    1|
| 13|    1|
+---+-----+



In [0]:
%fs ls /FileStore/outPut_json

path,name,size,modificationTime
dbfs:/FileStore/outPut_json/_SUCCESS,_SUCCESS,0,1719401598000
dbfs:/FileStore/outPut_json/_committed_8445401531700954223,_committed_8445401531700954223,115,1719401598000
dbfs:/FileStore/outPut_json/_started_8445401531700954223,_started_8445401531700954223,0,1719401597000
dbfs:/FileStore/outPut_json/part-00000-tid-8445401531700954223-8e5a5945-480c-465e-b920-140405f1b47d-1247-1-c000.json,part-00000-tid-8445401531700954223-8e5a5945-480c-465e-b920-140405f1b47d-1247-1-c000.json,105,1719401597000


In [0]:
dft=spark.read.text("dbfs:/FileStore/outPut_json/part-00000-tid-8445401531700954223-8e5a5945-480c-465e-b920-140405f1b47d-1247-1-c000.json")
dft.show()


+--------------------+
|               value|
+--------------------+
|{"age":55,"count":1}|
|{"age":28,"count":2}|
|{"age":25,"count":2}|
|{"age":15,"count":1}|
|{"age":13,"count":1}|
+--------------------+



In [0]:
spark.catalog.listTables()

Out[28]: [Table(name='users', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [0]:
df.createOrReplaceGlobalTempView("global_users")

In [0]:
qy="""select * from global_temp.global_users
where phone is not null and age is not null
order by age
limit 5
"""
dfg=spark.sql(qy)
dfg.show()

+---+------+------+----------+------+
|age|gender|  name|     phone|userid|
+---+------+------+----------+------+
| 13|  Male|Sundar|8522233456|     6|
| 25|  Male| Satya|8501099876|     1|
| 28|Female|Sandra|8508899001|     4|
| 28|  Male| Steve|8501085009|     7|
| 55|Female|Smriti|9246655498|     8|
+---+------+------+----------+------+



In [0]:
%sql
select * from global_temp.global_users
where age is not null and gender = 'Male'
order by age
limit 5


age,gender,name,phone,userid
13,Male,Sundar,8522233456.0,6
25,Male,Satya,8501099876.0,1
25,Male,Rahim,,3
28,Male,Steve,8501085009.0,7


In [0]:
path_json="dbfs:/FileStore/cts_data/json/2013_summary.json"
path_ot_json="FilesStore/output/json"

In [0]:
#df=spark.read.json(path_json)
# df=spark.read.format("json").load(path_json)
#df= spark.read.json(path_json,schema=myschema)
# df=spark.read.load(path_json,format="json")
df=spark.read.format("json").schema(myschema).load(path_json)

In [0]:
myschema=StructType(
  [
  StructField("DEST_COUNTRY_NAME",StringType(),True),
  StructField("ORIGIN_COUNTRY_NAME",StringType(),True),
  StructField("count",LongType(),True)
 ]
)

In [0]:
df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   12|
|       United States|            Croatia|    1|
|       United States|            Ireland|  266|
|               Egypt|      United States|   13|
|       United States|              India|   60|
|   Equatorial Guinea|      United States|    1|
|       United States|              Niger|    1|
|       United States|          Singapore|   22|
|       United States|            Grenada|   40|
|          Costa Rica|      United States|  509|
|             Senegal|      United States|   28|
|              Guyana|      United States|   34|
|       United States|       Sint Maarten|  260|
|       United States|   Marshall Islands|   33|
|             Bolivia|      United States|   33|
|            Anguilla|      United States|   22|
|       United States|           Paraguay|   15|
|             Algeri

In [0]:
df1=df.select("DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME","count")\
  .where("count>500 and DEST_COUNTRY_NAME='United States'")\
    .orderBy("DEST_COUNTRY_NAME")\
      .groupBy("DEST_COUNTRY_NAME").count()
df1.show()

+-----------------+-----+
|DEST_COUNTRY_NAME|count|
+-----------------+-----+
|    United States|   16|
+-----------------+-----+



In [0]:
df.show(4,False,False)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |12   |
|United States    |Croatia            |1    |
|United States    |Ireland            |266  |
|Egypt            |United States      |13   |
+-----------------+-------------------+-----+
only showing top 4 rows



In [0]:
df.show(2,False,True)

-RECORD 0----------------------------
 DEST_COUNTRY_NAME   | United States 
 ORIGIN_COUNTRY_NAME | Romania       
 count               | 12            
-RECORD 1----------------------------
 DEST_COUNTRY_NAME   | United States 
 ORIGIN_COUNTRY_NAME | Croatia       
 count               | 1             
only showing top 2 rows



In [0]:
df1.write.format("json").mode("overwrite").save(path_ot_json)

In [0]:
dfj=spark.read.json("dbfs:/FilesStore/output/json")
dfj.show()

+-----------------+-----+
|DEST_COUNTRY_NAME|count|
+-----------------+-----+
|    United States|   16|
+-----------------+-----+



In [0]:
%fs ls "dbfs:/FilesStore/output/json"

path,name,size,modificationTime
dbfs:/FilesStore/output/json/_SUCCESS,_SUCCESS,0,1731575110000
dbfs:/FilesStore/output/json/_committed_7200029083853766027,_committed_7200029083853766027,213,1731575109000
dbfs:/FilesStore/output/json/_committed_973489389122606716,_committed_973489389122606716,114,1719406134000
dbfs:/FilesStore/output/json/_committed_vacuum4805902389785269829,_committed_vacuum4805902389785269829,95,1731575110000
dbfs:/FilesStore/output/json/_started_7200029083853766027,_started_7200029083853766027,0,1731575109000
dbfs:/FilesStore/output/json/part-00000-tid-7200029083853766027-d571bc98-9627-4ee7-867f-adff25c2a306-19-1-c000.json,part-00000-tid-7200029083853766027-d571bc98-9627-4ee7-867f-adff25c2a306-19-1-c000.json,49,1731575109000


In [0]:
df=spark.read.text("dbfs:/FilesStore/output/json/part-00000-tid-7200029083853766027-d571bc98-9627-4ee7-867f-adff25c2a306-19-1-c000.json")
df.show()

+--------------------+
|               value|
+--------------------+
|{"DEST_COUNTRY_NA...|
+--------------------+



In [0]:
inpu_path_mltlne="dbfs:/FileStore/users_multiline.json"

In [0]:
df=spark.read.json(inpu_path_mltlne,multiLine=True)
df.show()

+----+------+-----+----------+------+
| age|gender| name|     phone|userid|
+----+------+-----+----------+------+
|  25|  Male|Satya|8501099876|     1|
|null|  Male| null|5676599876|     2|
|  25|  Male|Rahim|      null|     3|
+----+------+-----+----------+------+



In [0]:
dfm=df.select("name","gender","age","phone")\
  .where("gender is not null")\
    .orderBy("age")
dfm.show()

+-----+------+----+----------+
| name|gender| age|     phone|
+-----+------+----+----------+
| null|  Male|null|5676599876|
|Satya|  Male|  25|8501099876|
|Rahim|  Male|  25|      null|
+-----+------+----+----------+



In [0]:
#nested_json="dbfs:/FileStore/users_nested.json"
display(spark.read.text("dbfs:/FileStore/users_nested.json"))

value
"{""userid"": 1, ""name"":""Satya"", ""gender"": ""Male"", ""age"":25, ""address"":{""city"":""Hyderabad"", ""state"":""TS""}}"
"{""gender"": ""Male"", ""userid"": 2, ""phone"": ""5676599876"", ""address"":{""city"":""Chennai"", ""state"":""TN""}}"
"{""name"":""Rahim"", ""userid"": 3, ""gender"": ""Male"", ""age"":25, ""address"":{""city"":""Bangalore"", ""state"":""KA""}}"
"{""userid"": 4, ""name"":""Sandra"", ""gender"": ""Female"",""age"":28, ""address"":{""city"":""Vijayawada"", ""state"":""AP""}}"
"{""userid"": 5, ""name"":""Keerti"", ""gender"": ""Female"", ""age"":15, ""address"":{""city"":""Mumbai"", ""state"":""MH""}}"
"{""userid"": 6, ""name"":""Sundar"", ""gender"": ""Male"", ""age"":13, ""address"":{""city"":""Hyderabad"", ""state"":""TS""} }"
"{""userid"": 7, ""name"":""Steve"", ""gender"": ""Male"", ""age"":28, ""address"":{""city"":""Hyderabad"", ""state"":""TS""} }"
"{""userid"": 8, ""name"":""Smriti"", ""gender"": ""Female"", ""age"":55, ""address"":{""city"":""Hyderabad"", ""state"":""TS""} }"


In [0]:
nested_json="dbfs:/FileStore/users_nested.json"

In [0]:
dfn=spark.read.json(nested_json)
dfn.show()

+----------------+----+------+------+----------+------+
|         address| age|gender|  name|     phone|userid|
+----------------+----+------+------+----------+------+
| {Hyderabad, TS}|  25|  Male| Satya|      null|     1|
|   {Chennai, TN}|null|  Male|  null|5676599876|     2|
| {Bangalore, KA}|  25|  Male| Rahim|      null|     3|
|{Vijayawada, AP}|  28|Female|Sandra|      null|     4|
|    {Mumbai, MH}|  15|Female|Keerti|      null|     5|
| {Hyderabad, TS}|  13|  Male|Sundar|      null|     6|
| {Hyderabad, TS}|  28|  Male| Steve|      null|     7|
| {Hyderabad, TS}|  55|Female|Smriti|      null|     8|
+----------------+----+------+------+----------+------+



In [0]:
adress_schema=StructType(
  [
    StructField("city",StringType(),True),
    StructField("state",StringType(),True)
  ]
)
user_schema=StructType(
  [
    StructField("age",LongType(),True),
    StructField("gender",StringType(),True),
    StructField("name",StringType(),True),
    StructField("userid",LongType(),True),
    StructField("address",adress_schema,True)
  ]
)

In [0]:
dfnn=spark.read.json(nested_json,schema=user_schema)
dfnn.show()

+----+------+------+------+----------------+
| age|gender|  name|userid|         address|
+----+------+------+------+----------------+
|  25|  Male| Satya|     1| {Hyderabad, TS}|
|null|  Male|  null|     2|   {Chennai, TN}|
|  25|  Male| Rahim|     3| {Bangalore, KA}|
|  28|Female|Sandra|     4|{Vijayawada, AP}|
|  15|Female|Keerti|     5|    {Mumbai, MH}|
|  13|  Male|Sundar|     6| {Hyderabad, TS}|
|  28|  Male| Steve|     7| {Hyderabad, TS}|
|  55|Female|Smriti|     8| {Hyderabad, TS}|
+----+------+------+------+----------------+



In [0]:
dfnn.select(dfnn["address"]).show()

+----------------+
|         address|
+----------------+
| {Hyderabad, TS}|
|   {Chennai, TN}|
| {Bangalore, KA}|
|{Vijayawada, AP}|
|    {Mumbai, MH}|
| {Hyderabad, TS}|
| {Hyderabad, TS}|
| {Hyderabad, TS}|
+----------------+



In [0]:
parquet_ipath="dbfs:/FileStore/flight-data/parquet/2010-summary.parquet"
parquet_oPath="dbfs:/FilesStore/output/parquet"

In [0]:
dfp=spark.read.format("parquet").load(parquet_ipath)
dfp.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|    1|
|       United States|            Ireland|  264|
|       United States|              India|   69|
|               Egypt|      United States|   24|
|   Equatorial Guinea|      United States|    1|
|       United States|          Singapore|   25|
|       United States|            Grenada|   54|
|          Costa Rica|      United States|  477|
|             Senegal|      United States|   29|
|       United States|   Marshall Islands|   44|
|              Guyana|      United States|   17|
|       United States|       Sint Maarten|   53|
|               Malta|      United States|    1|
|             Bolivia|      United States|   46|
|            Anguilla|      United States|   21|
|Turks and Caicos ...|      United States|  136|
|       United States|        Afghanistan|    2|
|Saint Vincent and..

In [0]:
dfp.select("DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME","count")\
  .where("count > 500")\
    .orderBy("DEST_COUNTRY_NAME")\
      .groupBy("DEST_COUNTRY_NAME").count().show()


+------------------+-----+
| DEST_COUNTRY_NAME|count|
+------------------+-----+
|           Germany|    1|
|            France|    1|
|     United States|   16|
|       South Korea|    1|
|            Mexico|    1|
|       The Bahamas|    1|
|           Jamaica|    1|
|            Canada|    1|
|            Brazil|    1|
|Dominican Republic|    1|
|             Japan|    1|
|       El Salvador|    1|
|          Colombia|    1|
|    United Kingdom|    1|
|       Netherlands|    1|
+------------------+-----+



In [0]:
dfp.write.format("parquet").mode("overwrite").save(parquet_oPath)
#dfp.write.mode("overwrite).parquet(parquet_oPath)
#dfp.write.parquet(parquet_oPath,mode="append")

In [0]:
df=spark.read.parquet(parquet_oPath)
df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|    1|
|       United States|            Ireland|  264|
|       United States|              India|   69|
|               Egypt|      United States|   24|
|   Equatorial Guinea|      United States|    1|
|       United States|          Singapore|   25|
|       United States|            Grenada|   54|
|          Costa Rica|      United States|  477|
|             Senegal|      United States|   29|
|       United States|   Marshall Islands|   44|
|              Guyana|      United States|   17|
|       United States|       Sint Maarten|   53|
|               Malta|      United States|    1|
|             Bolivia|      United States|   46|
|            Anguilla|      United States|   21|
|Turks and Caicos ...|      United States|  136|
|       United States|        Afghanistan|    2|
|Saint Vincent and..

In [0]:
%fs ls "dbfs:/FilesStore/output/parquet"

path,name,size,modificationTime
dbfs:/FilesStore/output/parquet/_SUCCESS,_SUCCESS,0,1731576825000
dbfs:/FilesStore/output/parquet/_committed_3610776615009723062,_committed_3610776615009723062,234,1731576825000
dbfs:/FilesStore/output/parquet/_committed_7066049879613340735,_committed_7066049879613340735,125,1719408364000
dbfs:/FilesStore/output/parquet/_committed_vacuum9091526829009548216,_committed_vacuum9091526829009548216,96,1731576825000
dbfs:/FilesStore/output/parquet/_started_3610776615009723062,_started_3610776615009723062,0,1731576824000
dbfs:/FilesStore/output/parquet/part-00000-tid-3610776615009723062-4ed19364-395b-47cf-a630-94a893326dfe-49-1-c000.snappy.parquet,part-00000-tid-3610776615009723062-4ed19364-395b-47cf-a630-94a893326dfe-49-1-c000.snappy.parquet,5488,1731576825000


In [0]:
display(spark.read.text("dbfs:/FilesStore/output/parquet/part-00000-tid-3610776615009723062-4ed19364-395b-47cf-a630-94a893326dfe-49-1-c000.snappy.parquet"))

value
PAR1���Ȑ�<����
h
���United States���Egypt	�quatorial Guinea
"���Costa Rica���Senegal���GuyanaAMalt�@Bolivia���Anguilla���Turks and Caicos Islands ���Saint Vincent $0the Grenadine�ItalyUPakistan�HIceland���MarshallX�$Luxembourg9 Honduras�he Baham	(El Salvador�Samo�Kazakhn5Switzerr�!�$Maarten	��`g Kongarinidad�Tobago!0Lat-Slovak)(Suriname""Mexico�Ecu��@Colombia���Norwa%"
Thai�}Venezuel#Panam%�Morocco�Antigua�Barbud� Azerbaija�New ZeaV�LiberiJHungary�Sweden
Israel�EthiopiS$Martinique!�)� Barthelem��adosdGermany!�Kyrgyz!oIr%�^Malays)�CypruE8Qatar���Fiji�QHKittIjNevis�Taiwan!�aitiKuwait
"CanadaHederay, of Micrones)pJamaica���Dominican Republic`JapI�Fin!FAruba"
`renchaX�ai5XIndia���British VirginQ��Brazil!I
>Poly��u�Arab Emi��sA SingaporeA�NetheE�%7Chin%�Denmark!BPeru8DArgentina���Cayma��!a(outh Africa�Spain�c� Aille%�Bermud�
"KiribatiHaudi��i	jzech 1H!�Belgium�Afghanu�Curacae""Georgia%Philipp�.?�<a"


In [0]:
orc_ipath="dbfs:/FileStore/flight-data/orc/2010-summary.orc"
orc_opath="dbfs:/FilesStore/output/orc"

In [0]:
dfo=spark.read.orc(orc_ipath)
#dfo=spark.read.format("orc").load(orc_ipath)
#dfo=spark.read.load(orc_ipath,format="orc",schema=myschema)
dfo.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|    1|
|       United States|            Ireland|  264|
|       United States|              India|   69|
|               Egypt|      United States|   24|
|   Equatorial Guinea|      United States|    1|
|       United States|          Singapore|   25|
|       United States|            Grenada|   54|
|          Costa Rica|      United States|  477|
|             Senegal|      United States|   29|
|       United States|   Marshall Islands|   44|
|              Guyana|      United States|   17|
|       United States|       Sint Maarten|   53|
|               Malta|      United States|    1|
|             Bolivia|      United States|   46|
|            Anguilla|      United States|   21|
|Turks and Caicos ...|      United States|  136|
|       United States|        Afghanistan|    2|
|Saint Vincent and..

In [0]:
dfo.filter("count>=44  and count<=300").show()

+--------------------+--------------------+-----+
|   DEST_COUNTRY_NAME| ORIGIN_COUNTRY_NAME|count|
+--------------------+--------------------+-----+
|       United States|             Ireland|  264|
|       United States|               India|   69|
|       United States|             Grenada|   54|
|       United States|    Marshall Islands|   44|
|       United States|        Sint Maarten|   53|
|             Bolivia|       United States|   46|
|Turks and Caicos ...|       United States|  136|
|       United States|              Russia|  156|
|       United States|Federated States ...|   48|
|             Iceland|       United States|  118|
|    Marshall Islands|       United States|   77|
|          Luxembourg|       United States|   91|
|       United States|             Senegal|   46|
|        Sint Maarten|       United States|   61|
|           Hong Kong|       United States|  252|
| Trinidad and Tobago|       United States|  187|
|             Ecuador|       United States|  272|


In [0]:
dfo.write.orc(orc_opath,mode="overwrite")

In [0]:
df=spark.read.orc(orc_opath)
df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|    1|
|       United States|            Ireland|  264|
|       United States|              India|   69|
|               Egypt|      United States|   24|
|   Equatorial Guinea|      United States|    1|
|       United States|          Singapore|   25|
|       United States|            Grenada|   54|
|          Costa Rica|      United States|  477|
|             Senegal|      United States|   29|
|       United States|   Marshall Islands|   44|
|              Guyana|      United States|   17|
|       United States|       Sint Maarten|   53|
|               Malta|      United States|    1|
|             Bolivia|      United States|   46|
|            Anguilla|      United States|   21|
|Turks and Caicos ...|      United States|  136|
|       United States|        Afghanistan|    2|
|Saint Vincent and..

In [0]:
csv_ipath="dbfs:/FileStore/flight-data/csv/2010_summary.csv"
csv_opath="dbfs:/FilesStore/output/csv"

In [0]:
schema=StructType(
  [
    StructField("DEST_COUNTRY_NAME",StringType(),True),
    StructField("ORIGIN_COUNTRY_NAME",StringType(),True),
    StructField("count",LongType(),True)
  ]
)

In [0]:
#df=spark.read.format("csv").schema(schema).load(csv_ipath)
#df=spark.read.csv(csv_ipath,schema=schema)
#df=spark.read.load(csv_ipath,format="csv",schema=schema)
#df=spark.read.format("csv").option("inferSchema",True).option("header",True).load(csv_ipath)
df=spark.read.format("csv").load(csv_ipath,inferSchema=True,header=True)

In [0]:

df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|    1|
|       United States|            Ireland|  264|
|       United States|              India|   69|
|               Egypt|      United States|   24|
|   Equatorial Guinea|      United States|    1|
|       United States|          Singapore|   25|
|       United States|            Grenada|   54|
|          Costa Rica|      United States|  477|
|             Senegal|      United States|   29|
|       United States|   Marshall Islands|   44|
|              Guyana|      United States|   17|
|       United States|       Sint Maarten|   53|
|               Malta|      United States|    1|
|             Bolivia|      United States|   46|
|            Anguilla|      United States|   21|
|Turks and Caicos ...|      United States|  136|
|       United States|        Afghanistan|    2|
|Saint Vincent and..

In [0]:
df2=df.select("ORIGIN_COUNTRY_NAME","count")\
  .where("count>100 and ORIGIN_COUNTRY_NAME='United States'")\
    .orderBy("ORIGIN_COUNTRY_NAME")
df2.show()

+-------------------+------+
|ORIGIN_COUNTRY_NAME| count|
+-------------------+------+
|      United States|   477|
|      United States|   136|
|      United States|   390|
|      United States|   118|
|      United States|   391|
|      United States|   903|
|      United States|   519|
|      United States|   315|
|      United States|   252|
|      United States|   187|
|      United States|  6200|
|      United States|   272|
|      United States|   785|
|      United States|   377|
|      United States|   355|
|      United States|   123|
|      United States|   117|
|      United States|348113|
|      United States|   130|
|      United States|  1392|
+-------------------+------+
only showing top 20 rows



In [0]:
#df2.write.format("csv").mode("overwrite").save(csv_opath,header=True)
#df2.write.csv(csv_opath,mode="overwrite",header=True)
#df2.write.format("csv").option("header",True).mode("overwrite").save(csv_opath)
df2.write.csv(csv_opath,mode="overwrite",sep="|")

In [0]:

df=spark.read.csv(csv_opath)
df.show()

+--------------------+
|                 _c0|
+--------------------+
|   United States|477|
|   United States|136|
|   United States|390|
|   United States|118|
|   United States|391|
|   United States|903|
|   United States|519|
|   United States|315|
|   United States|252|
|   United States|187|
|  United States|6200|
|   United States|272|
|   United States|785|
|   United States|377|
|   United States|355|
|   United States|123|
|   United States|117|
|United States|348113|
|   United States|130|
|  United States|1392|
+--------------------+
only showing top 20 rows



In [0]:
%fs ls "dbfs:/FilesStore/output/csv"

path,name,size,modificationTime
dbfs:/FilesStore/output/csv/_committed_2000670717476499313,_committed_2000670717476499313,199,1731578490000
dbfs:/FilesStore/output/csv/_committed_414599292412238781,_committed_414599292412238781,201,1719410378000
dbfs:/FilesStore/output/csv/_committed_5289793366483335652,_committed_5289793366483335652,203,1719409663000
dbfs:/FilesStore/output/csv/_committed_6078323842886581591,_committed_6078323842886581591,199,1731578437000
dbfs:/FilesStore/output/csv/_committed_6441141922822189752,_committed_6441141922822189752,199,1731578707000
dbfs:/FilesStore/output/csv/_committed_6973340340857908733,_committed_6973340340857908733,214,1719409341000
dbfs:/FilesStore/output/csv/_committed_7060119795122517406,_committed_7060119795122517406,200,1731578394000
dbfs:/FilesStore/output/csv/_committed_7755764742105732288,_committed_7755764742105732288,114,1719409267000
dbfs:/FilesStore/output/csv/_committed_895111782368127828,_committed_895111782368127828,202,1719409778000
dbfs:/FilesStore/output/csv/_committed_vacuum196108531690390214,_committed_vacuum196108531690390214,225,1731578395000


In [0]:
display(spark.read.text("dbfs:/FilesStore/output/csv/part-00000-tid-6441141922822189752-76b218ba-15ca-4d40-b4bb-9d84f6ffdebd-87-1-c000.csv"))

value
United States|477
United States|136
United States|390
United States|118
United States|391
United States|903
United States|519
United States|315
United States|252
United States|187


In [0]:
df=spark.read.csv(csv_opath,sep="|",header=True,inferSchema=True)
df.show()

+-------------+------+
|United States|   477|
+-------------+------+
|United States|   136|
|United States|   390|
|United States|   118|
|United States|   391|
|United States|   903|
|United States|   519|
|United States|   315|
|United States|   252|
|United States|   187|
|United States|  6200|
|United States|   272|
|United States|   785|
|United States|   377|
|United States|   355|
|United States|   123|
|United States|   117|
|United States|348113|
|United States|   130|
|United States|  1392|
|United States|   231|
+-------------+------+
only showing top 20 rows



In [0]:
display(spark.read.text(csv_opath))

value
United States|477
United States|136
United States|390
United States|118
United States|391
United States|903
United States|519
United States|315
United States|252
United States|187


In [0]:
display(spark.read.option("header",True).option("inferSchema",True).csv("dbfs:/FilesStore/output/csv"))

United States|477
United States|136
United States|390
United States|118
United States|391
United States|903
United States|519
United States|315
United States|252
United States|187
United States|6200


In [0]:
dfcv=spark.read.format("csv").load(csv_ipath,header=True).toDF("DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME", "count")
dfcv.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|    1|
|       United States|            Ireland|  264|
|       United States|              India|   69|
|               Egypt|      United States|   24|
|   Equatorial Guinea|      United States|    1|
|       United States|          Singapore|   25|
|       United States|            Grenada|   54|
|          Costa Rica|      United States|  477|
|             Senegal|      United States|   29|
|       United States|   Marshall Islands|   44|
|              Guyana|      United States|   17|
|       United States|       Sint Maarten|   53|
|               Malta|      United States|    1|
|             Bolivia|      United States|   46|
|            Anguilla|      United States|   21|
|Turks and Caicos ...|      United States|  136|
|       United States|        Afghanistan|    2|
|Saint Vincent and..

In [0]:
dfcv.show(5,False)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |1    |
|United States    |Ireland            |264  |
|United States    |India              |69   |
|Egypt            |United States      |24   |
|Equatorial Guinea|United States      |1    |
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
text_path="dbfs:/FileStore/cts_data/wordcount.txt"

In [0]:
dft=spark.read.text(text_path)
dft.show(truncate=False)

+----------------------------------------------+
|value                                         |
+----------------------------------------------+
|spark is a general purpose execution framework|
|spark can run on hadoop                       |
|scala is preferred language for spark         |
|spark also supports java and python           |
|spark is a general purpose execution framework|
|spark can run on hadoop                       |
|scala is preferred language for spark         |
|spark also supports java and python           |
|python spark scala java pyspark hadoop        |
|spark python spark rdd rdd rdd sql spark      |
|pythom machine learning spark sql rdd rdd     |
+----------------------------------------------+



In [0]:
from pyspark.sql.functions import explode,split

In [0]:
df=dft.select(explode(split(dft["value"]," ")).alias("Words"))\
  .groupBy("Words").count()\
    .orderBy("Words")
df.show()

+---------+-----+
|    Words|count|
+---------+-----+
|        a|    2|
|     also|    2|
|      and|    2|
|      can|    2|
|execution|    2|
|      for|    2|
|framework|    2|
|  general|    2|
|   hadoop|    3|
|       is|    4|
|     java|    3|
| language|    2|
| learning|    1|
|  machine|    1|
|       on|    2|
|preferred|    2|
|  purpose|    2|
|  pyspark|    1|
|   pythom|    1|
|   python|    4|
+---------+-----+
only showing top 20 rows



In [0]:
df.select("Words").write.mode("overwrite").text("dbfs:/FilesStore/output/text")

In [0]:
df.printSchema()

root
 |-- Words: string (nullable = false)
 |-- count: long (nullable = false)



In [0]:
df=spark.read.text("dbfs:/FilesStore/output/text")
df.show(10,False)

+---------+
|value    |
+---------+
|a        |
|also     |
|and      |
|can      |
|execution|
|for      |
|framework|
|general  |
|hadoop   |
|is       |
+---------+
only showing top 10 rows



**DATAFARME** **TRANSFORMATION** **METHOD**

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [0]:
from pyspark.sql.functions import  when,lit,explode,split,col,asc,desc,avg,expr,count,max,sum,min

In [0]:
ListUsers=[(1, "Raju", 5),
            (2, "Ramesh", 15),
            (3, "Rajesh", 18),
            (4, "Raghu", 35),
            (5, "Ramya", 25),
            (6, "Radhika", 35),
            (7, "Ravi", 70)]

In [0]:
Schema=StructType(
  [StructField("id",LongType(),True),
   StructField("Name",StringType(),True),
   StructField("Age",StringType(),True)
   ]
)
        

In [0]:
#df=spark.createDataFrame(ListUsers)
df=spark.createDataFrame(data=ListUsers,schema=my_schema)
#df=spark.createDataFrame(ListUsers,["id","Name","Age"])
#df=spark.createDataFrame(ListUsers).toDF("id","Name","Age")
df.show()

+---+-------+---+
| Id|   Name|Age|
+---+-------+---+
|  1|   Raju|  5|
|  2| Ramesh| 15|
|  3| Rajesh| 18|
|  4|  Raghu| 35|
|  5|  Ramya| 25|
|  6|Radhika| 35|
|  7|   Ravi| 70|
+---+-------+---+



In [0]:
my_schema="Id int,Name string, Age int"

In [0]:
rdd=sc.parallelize(ListUsers,3)
rdd.collect()

Out[163]: [(1, 'Raju', 5),
 (2, 'Ramesh', 15),
 (3, 'Rajesh', 18),
 (4, 'Raghu', 35),
 (5, 'Ramya', 25),
 (6, 'Radhika', 35),
 (7, 'Ravi', 70)]

In [0]:
df=rdd.toDF(["Id","Name","Age"])
df.show()

+---+-------+---+
| Id|   Name|Age|
+---+-------+---+
|  1|   Raju|  5|
|  2| Ramesh| 15|
|  3| Rajesh| 18|
|  4|  Raghu| 35|
|  5|  Ramya| 25|
|  6|Radhika| 35|
|  7|   Ravi| 70|
+---+-------+---+



In [0]:
df.rdd.collect()

Out[166]: [Row(Id=1, Name='Raju', Age=5),
 Row(Id=2, Name='Ramesh', Age=15),
 Row(Id=3, Name='Rajesh', Age=18),
 Row(Id=4, Name='Raghu', Age=35),
 Row(Id=5, Name='Ramya', Age=25),
 Row(Id=6, Name='Radhika', Age=35),
 Row(Id=7, Name='Ravi', Age=70)]

In [0]:
Rdd=df.rdd
Rdd.getNumPartitions()

Out[167]: 3

In [0]:
Rdd.collect()

Out[168]: [Row(Id=1, Name='Raju', Age=5),
 Row(Id=2, Name='Ramesh', Age=15),
 Row(Id=3, Name='Rajesh', Age=18),
 Row(Id=4, Name='Raghu', Age=35),
 Row(Id=5, Name='Ramya', Age=25),
 Row(Id=6, Name='Radhika', Age=35),
 Row(Id=7, Name='Ravi', Age=70)]

In [0]:
datalist=[("vanitha,Hamsa",24,"ECE"),
          ("Punith",22,"BSC")]

In [0]:
column="Name string, Age int, Dept String"

In [0]:
#df=spark.createDataFrame(datalist,column)
df=spark.createDataFrame(datalist,["Name","Age","Dept"])
df.show()

+-------------+---+----+
|         Name|Age|Dept|
+-------------+---+----+
|vanitha,Hamsa| 24| ECE|
|       Punith| 22| BSC|
+-------------+---+----+



In [0]:
df.select(explode(split(df["Name"],",")).alias("Name"),df.Age,df.Dept).show()


+-------+---+----+
|   Name|Age|Dept|
+-------+---+----+
|vanitha| 24| ECE|
|  Hamsa| 24| ECE|
| Punith| 22| BSC|
+-------+---+----+



In [0]:
listdata=[("James,John","USA"),
          ("Robert,Michel","UK")]

In [0]:
df=spark.createDataFrame(listdata,["NAME","COUNTRY"])
df.show()

+-------------+-------+
|         NAME|COUNTRY|
+-------------+-------+
|   James,John|    USA|
|Robert,Michel|     UK|
+-------------+-------+



In [0]:
df.select(explode(split(df["NAME"],",")).alias("NAME"),df.COUNTRY).show()

+------+-------+
|  NAME|COUNTRY|
+------+-------+
| James|    USA|
|  John|    USA|
|Robert|     UK|
|Michel|     UK|
+------+-------+



In [0]:
datalist=[("vanitha Hamsa",24,"ECE")]
column="NAME STRING,AGE INT, DEPT STRING"

In [0]:
df=spark.createDataFrame(datalist,column)

In [0]:
df.select(explode(split(df["NAME"]," ")).alias("NAME"),df.AGE,df.DEPT).show()

+-------+---+----+
|   NAME|AGE|DEPT|
+-------+---+----+
|vanitha| 24| ECE|
|  Hamsa| 24| ECE|
+-------+---+----+



In [0]:
data=[("vanitha",("pyspark,python")),
      ("hamsa",("deveops,sql"))]

In [0]:
df=spark.createDataFrame(data,["NAME","SUBJECT"])
df.show()

+-------+--------------+
|   NAME|       SUBJECT|
+-------+--------------+
|vanitha|pyspark,python|
|  hamsa|   deveops,sql|
+-------+--------------+



In [0]:
df.select(df.NAME,explode(split(df["SUBJECT"],",")).alias("sub")).show()

+-------+-------+
|   NAME|    sub|
+-------+-------+
|vanitha|pyspark|
|vanitha| python|
|  hamsa|deveops|
|  hamsa|    sql|
+-------+-------+



In [0]:
rdd=sc.parallelize(["vanitha",
                    "punitha"])
rdd1=rdd.map(lambda x:(x,))
rdd1.collect()

Out[222]: [('vanitha',), ('punitha',)]

In [0]:
df=rdd1.toDF(["Name"])
df.show()

+-------+
|   Name|
+-------+
|vanitha|
|punitha|
+-------+



In [0]:
df=spark.createDataFrame(rdd1,["NAME"])
df.show()

+-------+
|   NAME|
+-------+
|vanitha|
|punitha|
+-------+



In [0]:
df=spark.createDataFrame([("vanitha","punith")],["Name"])
df.show()

+-------+------+
|   Name|    _2|
+-------+------+
|vanitha|punith|
+-------+------+



In [0]:
df=spark.createDataFrame([("vanitha","punith",)],["Name"])
df.show()

+-------+------+
|   Name|    _2|
+-------+------+
|vanitha|punith|
+-------+------+



In [0]:
df=spark.createDataFrame([("Vanitha,Punith",)],["Name"])
df.select(explode(split(df["Name"],",")).alias("name")).show()

+-------+
|   name|
+-------+
|Vanitha|
| Punith|
+-------+



In [0]:
df=spark.createDataFrame([("vanitha Punith",)],["NAME"])
df.select(explode(split(df["NAME"]," ")).alias("NAME")).show()

+-------+
|   NAME|
+-------+
|vanitha|
| Punith|
+-------+



In [0]:
samples=[(20,"vanitha",23),
         (21,"puni",21),
         (23,"shrusti",25),
         (24,"kavya",28),
         (25,"hamsa",24),
         (26,"thulsi",24)]

column="id int, name string, age int"

In [0]:
df=spark.createDataFrame(samples,schema=column)
df.show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
| 20|vanitha| 23|
| 21|   puni| 21|
| 23|shrusti| 25|
| 24|  kavya| 28|
| 25|  hamsa| 24|
| 26| thulsi| 24|
+---+-------+---+



In [0]:
rdd=sc.parallelize(samples,3)
rdd.collect()

Out[241]: [(20, 'vanitha', 23),
 (21, 'puni', 21),
 (23, 'shrusti', 25),
 (24, 'kavya', 28),
 (25, 'hamsa', 24),
 (26, 'thulsi', 24)]

In [0]:
df=rdd.toDF(["ID","NAME","AGE"])
df.show()

+---+-------+---+
| ID|   NAME|AGE|
+---+-------+---+
| 20|vanitha| 23|
| 21|   puni| 21|
| 23|shrusti| 25|
| 24|  kavya| 28|
| 25|  hamsa| 24|
| 26| thulsi| 24|
+---+-------+---+



In [0]:
path_json="dbfs:/FileStore/cts_data/json/2010_summary.json"
dfj=spark.read.json(path_json)
dfj.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|    1|
|       United States|            Ireland|  264|
|       United States|              India|   69|
|               Egypt|      United States|   24|
|   Equatorial Guinea|      United States|    1|
|       United States|          Singapore|   25|
|       United States|            Grenada|   54|
|          Costa Rica|      United States|  477|
|             Senegal|      United States|   29|
|       United States|   Marshall Islands|   44|
|              Guyana|      United States|   17|
|       United States|       Sint Maarten|   53|
|               Malta|      United States|    1|
|             Bolivia|      United States|   46|
|            Anguilla|      United States|   21|
|Turks and Caicos ...|      United States|  136|
|       United States|        Afghanistan|    2|
|Saint Vincent and..

In [0]:
df2=dfj.select(col("ORIGIN_COUNTRY_NAME").alias("origin"),
               expr("count+25 as Count"),
                  expr("DEST_COUNTRY_NAME ='United States' as Destination ")).show()


+----------------+-----+-----------+
|          origin|Count|Destination|
+----------------+-----+-----------+
|         Romania|   26|       true|
|         Ireland|  289|       true|
|           India|   94|       true|
|   United States|   49|      false|
|   United States|   26|      false|
|       Singapore|   50|       true|
|         Grenada|   79|       true|
|   United States|  502|      false|
|   United States|   54|      false|
|Marshall Islands|   69|       true|
|   United States|   42|      false|
|    Sint Maarten|   78|       true|
|   United States|   26|      false|
|   United States|   71|      false|
|   United States|   46|      false|
|   United States|  161|      false|
|     Afghanistan|   27|       true|
|   United States|   26|      false|
|   United States|  415|      false|
|          Russia|  181|       true|
+----------------+-----+-----------+
only showing top 20 rows



In [0]:
df3=dfj.select(col("DEST_COUNTRY_NAME").alias("destination"),
               col("ORIGIN_COUNTRY_NAME").alias("origin"),
               col("count").alias("Count"),
               expr("count+ 100 as NewCount"),
               expr("count > 500 as HighFrequency"),
               expr("ORIGIN_COUNTRY_NAME='United States' as ORIGIN")).show()

+--------------------+----------------+-----+--------+-------------+------+
|         destination|          origin|Count|NewCount|HighFrequency|ORIGIN|
+--------------------+----------------+-----+--------+-------------+------+
|       United States|         Romania|    1|     101|        false| false|
|       United States|         Ireland|  264|     364|        false| false|
|       United States|           India|   69|     169|        false| false|
|               Egypt|   United States|   24|     124|        false|  true|
|   Equatorial Guinea|   United States|    1|     101|        false|  true|
|       United States|       Singapore|   25|     125|        false| false|
|       United States|         Grenada|   54|     154|        false| false|
|          Costa Rica|   United States|  477|     577|        false|  true|
|             Senegal|   United States|   29|     129|        false|  true|
|       United States|Marshall Islands|   44|     144|        false| false|
|           

In [0]:
df1=dfj.filter(col("count")>500)\
  .withColumn("HighFrequency",col("count")>500)\
    .withColumn("Destination",expr("DEST_COUNTRY_NAME ='United States'")).show()

+------------------+-------------------+------+-------------+-----------+
| DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|HighFrequency|Destination|
+------------------+-------------------+------+-------------+-----------+
|     United States|        Netherlands|   570|         true|       true|
|       The Bahamas|      United States|   903|         true|      false|
|       El Salvador|      United States|   519|         true|      false|
|            Mexico|      United States|  6200|         true|      false|
|     United States|         Costa Rica|   501|         true|       true|
|          Colombia|      United States|   785|         true|      false|
|     United States|            Jamaica|   757|         true|       true|
|     United States|        The Bahamas|   959|         true|       true|
|     United States|              China|   505|         true|       true|
|     United States| Dominican Republic|  1150|         true|       true|
|     United States|      United State

In [0]:
df3=dfj.select("DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME","count")\
  .filter("DEST_COUNTRY_NAME='United States' and count >300")\
    .withColumn("Count",col("count").cast("int"))\
      .withColumn("Destination",when(col("DEST_COUNTRY_NAME")=='United States', lit("USA")).otherwise(col("DEST_COUNTRY_NAME")))\
        .withColumn("Origin",expr("case when ORIGIN_COUNTRY_NAME='United States' then 'US' else ORIGIN_COUNTRY_NAME end")).show()

+-----------------+-------------------+------+-----------+------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| Count|Destination|            Origin|
+-----------------+-------------------+------+-----------+------------------+
|    United States|        Netherlands|   570|        USA|       Netherlands|
|    United States|            Ecuador|   345|        USA|           Ecuador|
|    United States|         Costa Rica|   501|        USA|        Costa Rica|
|    United States|          Guatemala|   333|        USA|         Guatemala|
|    United States|            Jamaica|   757|        USA|           Jamaica|
|    United States|          Venezuela|   341|        USA|         Venezuela|
|    United States|        The Bahamas|   959|        USA|       The Bahamas|
|    United States|              China|   505|        USA|             China|
|    United States| Dominican Republic|  1150|        USA|Dominican Republic|
|    United States|      United States|348113|        USA|      

In [0]:
df3=dfj.select("DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME","count")\
  .filter("DEST_COUNTRY_NAME ='United States' and count > 750")\
    .withColumn("NewCount",expr("count+50"))\
    .withColumn("origin",when(col("ORIGIN_COUNTRY_NAME")=='United States', lit("USA")).otherwise(col("ORIGIN_COUNTRY_NAME")))\
      .withColumn("Dest",expr(" case when  DEST_COUNTRY_NAME= 'United States' then 'USS' else DEST_COUNTRY_NAME end")).show()

+-----------------+-------------------+------+--------+------------------+----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|NewCount|            origin|Dest|
+-----------------+-------------------+------+--------+------------------+----+
|    United States|            Jamaica|   757|     807|           Jamaica| USS|
|    United States|        The Bahamas|   959|    1009|       The Bahamas| USS|
|    United States| Dominican Republic|  1150|    1200|Dominican Republic| USS|
|    United States|      United States|348113|  348163|               USA| USS|
|    United States|             France|   776|     826|            France| USS|
|    United States|            Germany|  1406|    1456|           Germany| USS|
|    United States|             Mexico|  6220|    6270|            Mexico| USS|
|    United States|              Japan|  1307|    1357|             Japan| USS|
|    United States|     United Kingdom|  1503|    1553|    United Kingdom| USS|
|    United States|             Canada| 

In [0]:
dfj.withColumn("dest",when(col("DEST_COUNTRY_NAME")=='United States', lit("USA")).otherwise(col("DEST_COUNTRY_NAME"))).show()

+--------------------+-------------------+-----+--------------------+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|                dest|
+--------------------+-------------------+-----+--------------------+
|       United States|            Romania|    1|                 USA|
|       United States|            Ireland|  264|                 USA|
|       United States|              India|   69|                 USA|
|               Egypt|      United States|   24|               Egypt|
|   Equatorial Guinea|      United States|    1|   Equatorial Guinea|
|       United States|          Singapore|   25|                 USA|
|       United States|            Grenada|   54|                 USA|
|          Costa Rica|      United States|  477|          Costa Rica|
|             Senegal|      United States|   29|             Senegal|
|       United States|   Marshall Islands|   44|                 USA|
|              Guyana|      United States|   17|              Guyana|
|       United State

In [0]:
dfj.withColumn("Destination",col("DEST_COUNTRY_NAME")=='United States')\
  .withColumn("Count", expr("count * 2"))\
    .withColumn("origin",col("ORIGIN_COUNTRY_NAME")=='United States').show()

+--------------------+-------------------+-----+-----------+------+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|Count|Destination|origin|
+--------------------+-------------------+-----+-----------+------+
|       United States|            Romania|    2|       true| false|
|       United States|            Ireland|  528|       true| false|
|       United States|              India|  138|       true| false|
|               Egypt|      United States|   48|      false|  true|
|   Equatorial Guinea|      United States|    2|      false|  true|
|       United States|          Singapore|   50|       true| false|
|       United States|            Grenada|  108|       true| false|
|          Costa Rica|      United States|  954|      false|  true|
|             Senegal|      United States|   58|      false|  true|
|       United States|   Marshall Islands|   88|       true| false|
|              Guyana|      United States|   34|      false|  true|
|       United States|       Sint Maarten|  106|

In [0]:
#dfj.filter("count>500").show()
#dfj.where("count>500").show()
#dfj.filter((col("count")>500) & (col("DEST_COUNTRY_NAME")=='United States')).show()
dfj.filter("count>500 and DEST_COUNTRY_NAME=='United States'").show()

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|        Netherlands|   570|
|    United States|         Costa Rica|   501|
|    United States|            Jamaica|   757|
|    United States|        The Bahamas|   959|
|    United States|              China|   505|
|    United States| Dominican Republic|  1150|
|    United States|      United States|348113|
|    United States|        South Korea|   621|
|    United States|             France|   776|
|    United States|            Germany|  1406|
|    United States|             Mexico|  6220|
|    United States|              Japan|  1307|
|    United States|             Brazil|   578|
|    United States|     United Kingdom|  1503|
|    United States|             Canada|  8305|
|    United States|           Colombia|   832|
+-----------------+-------------------+------+



In [0]:
dfj.select("DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME","count")\
  .filter("count>1000")\
    .withColumn("New_count",expr("count+100"))\
      .orderBy(desc("count")).show()

+------------------+-------------------+------+---------+
| DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|New_count|
+------------------+-------------------+------+---------+
|     United States|      United States|348113|   348213|
|     United States|             Canada|  8305|     8405|
|            Canada|      United States|  8271|     8371|
|     United States|             Mexico|  6220|     6320|
|            Mexico|      United States|  6200|     6300|
|    United Kingdom|      United States|  1629|     1729|
|     United States|     United Kingdom|  1503|     1603|
|     United States|            Germany|  1406|     1506|
|           Germany|      United States|  1392|     1492|
|             Japan|      United States|  1383|     1483|
|     United States|              Japan|  1307|     1407|
|     United States| Dominican Republic|  1150|     1250|
|Dominican Republic|      United States|  1109|     1209|
+------------------+-------------------+------+---------+



In [0]:
dfj.sort(col("count").desc()).show()

+------------------+-------------------+------+
| DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+------------------+-------------------+------+
|     United States|      United States|348113|
|     United States|             Canada|  8305|
|            Canada|      United States|  8271|
|     United States|             Mexico|  6220|
|            Mexico|      United States|  6200|
|    United Kingdom|      United States|  1629|
|     United States|     United Kingdom|  1503|
|     United States|            Germany|  1406|
|           Germany|      United States|  1392|
|             Japan|      United States|  1383|
|     United States|              Japan|  1307|
|     United States| Dominican Republic|  1150|
|Dominican Republic|      United States|  1109|
|            Brazil|      United States|   995|
|     United States|        The Bahamas|   959|
|       The Bahamas|      United States|   903|
|     United States|           Colombia|   832|
|          Colombia|      United States|

In [0]:
df2=dfj.select(col("DEST_COUNTRY_NAME").alias("Destination"),
          col("ORIGIN_COUNTRY_NAME").alias("origin"),
          col("count").alias("NewCount"),
          expr("count+10 as addedNewCount"),
          expr("count>500 as HighFrequency"),
          expr("DEST_COUNTRY_NAME =ORIGIN_COUNTRY_NAME as newcolumn"),
          (col("DEST_COUNTRY_NAME")=='ORIGIN_COUNTRY_NAME').alias("COUNT"))

In [0]:
#df3=df2.groupBy("Destination","HighFrequency").sum("NewCount").show()
#df3=df2.groupBy("Destination","HighFrequency").avg("NewCount").show()
df3=df2.groupBy("Destination","HighFrequency").max("NewCount").show()
  

+--------------------+-------------+-------------+
|         Destination|HighFrequency|max(NewCount)|
+--------------------+-------------+-------------+
|Federated States ...|        false|           46|
|                Cuba|        false|          243|
|      Czech Republic|        false|           21|
|           Australia|        false|          290|
|            Pakistan|        false|            9|
|          Martinique|        false|           23|
|         Philippines|        false|          132|
|               Qatar|        false|           41|
|             Ukraine|        false|           19|
|            Kiribati|        false|           17|
|              Taiwan|        false|          275|
|           Argentina|        false|          184|
|               Haiti|        false|          238|
|   Equatorial Guinea|        false|            1|
|              Uganda|        false|            2|
| Trinidad and Tobago|        false|          187|
|Bonaire, Sint Eus...|        f

In [0]:
df2.groupBy("Destination","HighFrequency")\
  .agg(count("NewCount").alias("totalCount"),
       sum("NewCount").alias("Totalsum"),
       avg("NewCount").alias("Average")).show()

+--------------------+-------------+----------+--------+-------+
|         Destination|HighFrequency|totalCount|Totalsum|Average|
+--------------------+-------------+----------+--------+-------+
|Federated States ...|        false|         1|      46|   46.0|
|                Cuba|        false|         1|     243|  243.0|
|      Czech Republic|        false|         1|      21|   21.0|
|           Australia|        false|         1|     290|  290.0|
|            Pakistan|        false|         1|       9|    9.0|
|          Martinique|        false|         1|      23|   23.0|
|         Philippines|        false|         1|     132|  132.0|
|               Qatar|        false|         1|      41|   41.0|
|             Ukraine|        false|         1|      19|   19.0|
|            Kiribati|        false|         1|      17|   17.0|
|              Taiwan|        false|         1|     275|  275.0|
|           Argentina|        false|         1|     184|  184.0|
|               Haiti|   

In [0]:
df2.groupBy("Destination","HighFrequency")\
  .agg(count("NewCount").alias("totalcount"),
       sum("NewCount").alias("sum")).show()

+--------------------+-------------+----------+---+
|         Destination|HighFrequency|totalcount|sum|
+--------------------+-------------+----------+---+
|Federated States ...|        false|         1| 46|
|                Cuba|        false|         1|243|
|      Czech Republic|        false|         1| 21|
|           Australia|        false|         1|290|
|            Pakistan|        false|         1|  9|
|          Martinique|        false|         1| 23|
|         Philippines|        false|         1|132|
|               Qatar|        false|         1| 41|
|             Ukraine|        false|         1| 19|
|            Kiribati|        false|         1| 17|
|              Taiwan|        false|         1|275|
|           Argentina|        false|         1|184|
|               Haiti|        false|         1|238|
|   Equatorial Guinea|        false|         1|  1|
|              Uganda|        false|         1|  2|
| Trinidad and Tobago|        false|         1|187|
|Bonaire, Si

In [0]:
dfj.limit(10).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
|    United States|          Singapore|   25|
|    United States|            Grenada|   54|
|       Costa Rica|      United States|  477|
|          Senegal|      United States|   29|
|    United States|   Marshall Islands|   44|
+-----------------+-------------------+-----+



In [0]:
df2.limit(5).show()

+-----------------+-------------+--------+-------------+-------------+---------+-----+
|      Destination|       origin|NewCount|addedNewCount|HighFrequency|newcolumn|COUNT|
+-----------------+-------------+--------+-------------+-------------+---------+-----+
|    United States|      Romania|       1|           11|        false|    false|false|
|    United States|      Ireland|     264|          274|        false|    false|false|
|    United States|        India|      69|           79|        false|    false|false|
|            Egypt|United States|      24|           34|        false|    false|false|
|Equatorial Guinea|United States|       1|           11|        false|    false|false|
+-----------------+-------------+--------+-------------+-------------+---------+-----+



In [0]:
dfj.selectExpr("count as Count",
               "count+100 as MultiCount",
               "DEST_COUNTRY_NAME='United States' as Domestic").show()

+-----+----------+--------+
|Count|MultiCount|Domestic|
+-----+----------+--------+
|    1|       101|    true|
|  264|       364|    true|
|   69|       169|    true|
|   24|       124|   false|
|    1|       101|   false|
|   25|       125|    true|
|   54|       154|    true|
|  477|       577|   false|
|   29|       129|   false|
|   44|       144|    true|
|   17|       117|   false|
|   53|       153|    true|
|    1|       101|   false|
|   46|       146|   false|
|   21|       121|   false|
|  136|       236|   false|
|    2|       102|    true|
|    1|       101|   false|
|  390|       490|   false|
|  156|       256|    true|
+-----+----------+--------+
only showing top 20 rows



In [0]:
dfj.withColumn("Count",col("count"))\
  .withColumn("Newcount",col("count")+10)\
      .withColumn("HighFrequency",col("count")>700)\
          .withColumn("destiny",expr("DEST_COUNTRY_NAME=ORIGIN_COUNTRY_NAME"))\
              .withColumnRenamed("DEST_COUNTRY_NAME","destination")\
                  .withColumnRenamed("ORIGIN_COUNTRY_NAME","orign")\
                      .withColumn("country",lit("INDIA")).show()

+--------------------+----------------+-----+--------+-------------+-------+-------+
|         destination|           orign|Count|Newcount|HighFrequency|destiny|country|
+--------------------+----------------+-----+--------+-------------+-------+-------+
|       United States|         Romania|    1|      11|        false|  false|  INDIA|
|       United States|         Ireland|  264|     274|        false|  false|  INDIA|
|       United States|           India|   69|      79|        false|  false|  INDIA|
|               Egypt|   United States|   24|      34|        false|  false|  INDIA|
|   Equatorial Guinea|   United States|    1|      11|        false|  false|  INDIA|
|       United States|       Singapore|   25|      35|        false|  false|  INDIA|
|       United States|         Grenada|   54|      64|        false|  false|  INDIA|
|          Costa Rica|   United States|  477|     487|        false|  false|  INDIA|
|             Senegal|   United States|   29|      39|        fal

In [0]:
dfj.withColumn("destiny",when(col("DEST_COUNTRY_NAME")=='United States',lit("INDIA")).otherwise("DESTINY"))\
  .withColumn("first",expr("case when  DEST_COUNTRY_NAME=ORIGIN_COUNTRY_NAME then 'ORIGIN' else ORIGIN_COUNTRY_NAME end ")).show()

+--------------------+-------------------+-----+-------+----------------+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|destiny|           first|
+--------------------+-------------------+-----+-------+----------------+
|       United States|            Romania|    1|  INDIA|         Romania|
|       United States|            Ireland|  264|  INDIA|         Ireland|
|       United States|              India|   69|  INDIA|           India|
|               Egypt|      United States|   24|DESTINY|   United States|
|   Equatorial Guinea|      United States|    1|DESTINY|   United States|
|       United States|          Singapore|   25|  INDIA|       Singapore|
|       United States|            Grenada|   54|  INDIA|         Grenada|
|          Costa Rica|      United States|  477|DESTINY|   United States|
|             Senegal|      United States|   29|DESTINY|   United States|
|       United States|   Marshall Islands|   44|  INDIA|Marshall Islands|
|              Guyana|      United Sta

In [0]:
listUsers = [(1, "Raju", 5),
             (1, "Raju", 5),
             (3, "Raju", 5),
             (4, "Raghu", 35),
             (4, "Raghu", 35),
             (6, "Raghu", 35),
             (7, "Ravi", 70)]

In [0]:
df=spark.createDataFrame(listUsers).toDF("Id","Name","Age")
df.show()

+---+-----+---+
| Id| Name|Age|
+---+-----+---+
|  1| Raju|  5|
|  1| Raju|  5|
|  3| Raju|  5|
|  4|Raghu| 35|
|  4|Raghu| 35|
|  6|Raghu| 35|
|  7| Ravi| 70|
+---+-----+---+



In [0]:
df.dropDuplicates().show()

+---+-----+---+
| Id| Name|Age|
+---+-----+---+
|  1| Raju|  5|
|  3| Raju|  5|
|  4|Raghu| 35|
|  6|Raghu| 35|
|  7| Ravi| 70|
+---+-----+---+



In [0]:
df.dropDuplicates().count()

Out[311]: 5

In [0]:
df.distinct().count()

Out[312]: 5

In [0]:
dfj.select("DEST_COUNTRY_NAME").count()

Out[313]: 255

In [0]:
dfj.select("DEST_COUNTRY_NAME").distinct().count()

Out[314]: 125

In [0]:
df.dropDuplicates(["Name","Age"]).show()

+---+-----+---+
| Id| Name|Age|
+---+-----+---+
|  1| Raju|  5|
|  4|Raghu| 35|
|  7| Ravi| 70|
+---+-----+---+



In [0]:
df.dropDuplicates(["Name","Age"]).count()

Out[316]: 3

In [0]:
df.distinct().count()

Out[317]: 5

In [0]:
rdd1=df.rdd.repartition(5)
rdd1.getNumPartitions()

Out[318]: 5

In [0]:
df.rdd.coalesce(3).getNumPartitions()

Out[320]: 3

In [0]:
listUsers = [(1, "Raju", 5),
             (2, "Ramesh", 75),
             (3, "Rajesh", 18),
             (4, "Raghu", 35),
             (5, "Ramya", 25),
             (6, "Radhika", 35),
             (7, "Ravi", 20)]

In [0]:
df=spark.createDataFrame(listUsers,["Id","Name","Age"])
df.show()

+---+-------+---+
| Id|   Name|Age|
+---+-------+---+
|  1|   Raju|  5|
|  2| Ramesh| 75|
|  3| Rajesh| 18|
|  4|  Raghu| 35|
|  5|  Ramya| 25|
|  6|Radhika| 35|
|  7|   Ravi| 20|
+---+-------+---+



In [0]:
df1=df.withColumn("agegroup",when(col("age")<7,"child")\
  .when(col("age")<25,"Teenage")\
    .when(col("age")<60,"Adult")\
      .otherwise("senior"))
              

In [0]:
df1.show()

+---+-------+---+--------+
| Id|   Name|Age|agegroup|
+---+-------+---+--------+
|  1|   Raju|  5|   child|
|  2| Ramesh| 75|  senior|
|  3| Rajesh| 18| Teenage|
|  4|  Raghu| 35|   Adult|
|  5|  Ramya| 25|   Adult|
|  6|Radhika| 35|   Adult|
|  7|   Ravi| 20| Teenage|
+---+-------+---+--------+



In [0]:
def age_grp(Age):
  if Age<7:
    return "Child"
  elif Age <25:
    return "Teenage"
  elif Age<60:
    return "Adult"
  else:
    return "Senior"

age_udf=udf(age_grp,returnType=StringType())
dfu=df1.withColumn("AGEGRP",age_udf(col("Age")))
dfu.show()


+---+-------+---+--------+-------+
| Id|   Name|Age|agegroup| AGEGRP|
+---+-------+---+--------+-------+
|  1|   Raju|  5|   child|  Child|
|  2| Ramesh| 75|  senior| Senior|
|  3| Rajesh| 18| Teenage|Teenage|
|  4|  Raghu| 35|   Adult|  Adult|
|  5|  Ramya| 25|   Adult|  Adult|
|  6|Radhika| 35|   Adult|  Adult|
|  7|   Ravi| 20| Teenage|Teenage|
+---+-------+---+--------+-------+



In [0]:
def age_grp(Age):
  if Age<7:
    return "Child"
  elif Age <25:
    return "Teenage"
  elif Age<60:
    return "Adult"
  else:
    return "Senior"

age_udf=udf(age_grp,returnType=StringType())
df2=df1.withColumn("Age_group",age_udf(col("Age")))
df2.show()

+---+-------+---+--------+---------+
| Id|   Name|Age|agegroup|Age_group|
+---+-------+---+--------+---------+
|  1|   Raju|  5|   child|    Child|
|  2| Ramesh| 75|  senior|   Senior|
|  3| Rajesh| 18| Teenage|  Teenage|
|  4|  Raghu| 35|   Adult|    Adult|
|  5|  Ramya| 25|   Adult|    Adult|
|  6|Radhika| 35|   Adult|    Adult|
|  7|   Ravi| 20| Teenage|  Teenage|
+---+-------+---+--------+---------+



In [0]:
@udf(returnType=StringType())
def age_grp(Age):
  if Age<7:
    return "Child"
  elif Age <25:
    return "Teenage"
  elif Age<60:
    return "Adult"
  else:
    return "Senior"
df_udf=df1.withColumn("AGE_GROUP",age_grp(col("Age")))
df_udf.show()

+---+-------+---+--------+---------+
| Id|   Name|Age|agegroup|AGE_GROUP|
+---+-------+---+--------+---------+
|  1|   Raju|  5|   child|    Child|
|  2| Ramesh| 75|  senior|   Senior|
|  3| Rajesh| 18| Teenage|  Teenage|
|  4|  Raghu| 35|   Adult|    Adult|
|  5|  Ramya| 25|   Adult|    Adult|
|  6|Radhika| 35|   Adult|    Adult|
|  7|   Ravi| 20| Teenage|  Teenage|
+---+-------+---+--------+---------+



In [0]:
df.createOrReplaceTempView("users")

In [0]:
def age_grp(Age):
  if Age<7:
    return "Child"
  elif Age <25:
    return "Teenage"
  elif Age<60:
    return "Adult"
  else:
    return "Senior"

age_udf=udf(age_grp,returnType=StringType())
spark.udf.register("age_udf",age_grp,returnType=StringType())

Out[354]: <function __main__.age_grp(Age)>

In [0]:
%sql
select *,age_udf(Age) as AGEGROUP from users

Id,Name,Age,AGEGROUP
1,Raju,5,Child
2,Ramesh,75,Senior
3,Rajesh,18,Teenage
4,Raghu,35,Adult
5,Ramya,25,Adult
6,Radhika,35,Adult
7,Ravi,20,Teenage


In [0]:
df2.drop("Age_group").show()

+---+-------+---+--------+
| Id|   Name|Age|agegroup|
+---+-------+---+--------+
|  1|   Raju|  5|   child|
|  2| Ramesh| 75|  senior|
|  3| Rajesh| 18| Teenage|
|  4|  Raghu| 35|   Adult|
|  5|  Ramya| 25|   Adult|
|  6|Radhika| 35|   Adult|
|  7|   Ravi| 20| Teenage|
+---+-------+---+--------+



In [0]:
df2.drop("agegroup","Age_group").show()

+---+-------+---+
| Id|   Name|Age|
+---+-------+---+
|  1|   Raju|  5|
|  2| Ramesh| 75|
|  3| Rajesh| 18|
|  4|  Raghu| 35|
|  5|  Ramya| 25|
|  6|Radhika| 35|
|  7|   Ravi| 20|
+---+-------+---+



In [0]:
df2.dropDuplicates(["Name"]).show()

+---+-------+---+--------+---------+
| Id|   Name|Age|agegroup|Age_group|
+---+-------+---+--------+---------+
|  6|Radhika| 35|   Adult|    Adult|
|  4|  Raghu| 35|   Adult|    Adult|
|  3| Rajesh| 18| Teenage|  Teenage|
|  1|   Raju|  5|   child|    Child|
|  2| Ramesh| 75|  senior|   Senior|
|  5|  Ramya| 25|   Adult|    Adult|
|  7|   Ravi| 20| Teenage|  Teenage|
+---+-------+---+--------+---------+



In [0]:
path="dbfs:/FileStore/users.json"
df=spark.read.json(path)
df.show()

+----+------+------+----------+------+
| age|gender|  name|     phone|userid|
+----+------+------+----------+------+
|  25|  Male| Satya|8501099876|     1|
|null|  Male| Vivek|5676599876|     2|
|  25|  Male| Rahim|      null|     3|
|  28|Female|Sandra|8508899001|     4|
|  15|Female|Keerti|      null|     5|
|  13|  Male|Sundar|8522233456|     6|
|  28|  Male| Steve|8501085009|     7|
|  55|Female|Smriti|9246655498|     8|
+----+------+------+----------+------+



In [0]:
df.dropna().show()

+---+------+------+----------+------+
|age|gender|  name|     phone|userid|
+---+------+------+----------+------+
| 25|  Male| Satya|8501099876|     1|
| 28|Female|Sandra|8508899001|     4|
| 13|  Male|Sundar|8522233456|     6|
| 28|  Male| Steve|8501085009|     7|
| 55|Female|Smriti|9246655498|     8|
+---+------+------+----------+------+



In [0]:
df.dropna(subset=["phone","age"]).show()

+---+------+------+----------+------+
|age|gender|  name|     phone|userid|
+---+------+------+----------+------+
| 25|  Male| Satya|8501099876|     1|
| 28|Female|Sandra|8508899001|     4|
| 13|  Male|Sundar|8522233456|     6|
| 28|  Male| Steve|8501085009|     7|
| 55|Female|Smriti|9246655498|     8|
+---+------+------+----------+------+



In [0]:
df.na.drop(subset=["phone"]).show()

+----+------+------+----------+------+
| age|gender|  name|     phone|userid|
+----+------+------+----------+------+
|  25|  Male| Satya|8501099876|     1|
|null|  Male| Vivek|5676599876|     2|
|  28|Female|Sandra|8508899001|     4|
|  13|  Male|Sundar|8522233456|     6|
|  28|  Male| Steve|8501085009|     7|
|  55|Female|Smriti|9246655498|     8|
+----+------+------+----------+------+



In [0]:
df.na.fill(99,subset=["age"]).na.fill('999999',subset=["phone"]).show()

+---+------+------+----------+------+
|age|gender|  name|     phone|userid|
+---+------+------+----------+------+
| 25|  Male| Satya|8501099876|     1|
| 99|  Male| Vivek|5676599876|     2|
| 25|  Male| Rahim|    999999|     3|
| 28|Female|Sandra|8508899001|     4|
| 15|Female|Keerti|    999999|     5|
| 13|  Male|Sundar|8522233456|     6|
| 28|  Male| Steve|8501085009|     7|
| 55|Female|Smriti|9246655498|     8|
+---+------+------+----------+------+



In [0]:
df.fillna(99,subset=["age"]).show()

+---+------+------+----------+------+
|age|gender|  name|     phone|userid|
+---+------+------+----------+------+
| 25|  Male| Satya|8501099876|     1|
| 99|  Male| Vivek|5676599876|     2|
| 25|  Male| Rahim|      null|     3|
| 28|Female|Sandra|8508899001|     4|
| 15|Female|Keerti|      null|     5|
| 13|  Male|Sundar|8522233456|     6|
| 28|  Male| Steve|8501085009|     7|
| 55|Female|Smriti|9246655498|     8|
+---+------+------+----------+------+



In [0]:
df.na.replace("Male","M",subset=["gender"]).na.replace("Female","F",subset=["gender"]).na.replace(25,250,subset=["age"]).show()

+----+------+------+----------+------+
| age|gender|  name|     phone|userid|
+----+------+------+----------+------+
| 250|     M| Satya|8501099876|     1|
|null|     M| Vivek|5676599876|     2|
| 250|     M| Rahim|      null|     3|
|  28|     F|Sandra|8508899001|     4|
|  15|     F|Keerti|      null|     5|
|  13|     M|Sundar|8522233456|     6|
|  28|     M| Steve|8501085009|     7|
|  55|     F|Smriti|9246655498|     8|
+----+------+------+----------+------+



In [0]:
ipath="dbfs:/FileStore/cts_data/csv/2015_summary.csv"

In [0]:
df1=spark.read.csv(ipath,inferSchema=True,header=True)
df1.show()


+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [0]:
df2=df1.where("count>1000")
df2.show()

+------------------+-------------------+------+
| DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+------------------+-------------------+------+
|            Mexico|      United States|  7140|
|     United States| Dominican Republic|  1420|
|     United States|      United States|370002|
|           Germany|      United States|  1468|
|            Canada|      United States|  8399|
|Dominican Republic|      United States|  1353|
|             Japan|      United States|  1548|
|     United States|            Germany|  1336|
|     United States|             Mexico|  7187|
|    United Kingdom|      United States|  2025|
|     United States|              Japan|  1496|
|     United States|     United Kingdom|  1970|
|     United States|             Canada|  8483|
|       South Korea|      United States|  1048|
+------------------+-------------------+------+



In [0]:
df3=df1.where("ORIGIN_COUNTRY_NAME='India'")
df3.show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|              India|   62|
+-----------------+-------------------+-----+



In [0]:
df5=df2.union(df3).show()

+------------------+-------------------+------+
| DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+------------------+-------------------+------+
|            Mexico|      United States|  7140|
|     United States| Dominican Republic|  1420|
|     United States|      United States|370002|
|           Germany|      United States|  1468|
|            Canada|      United States|  8399|
|Dominican Republic|      United States|  1353|
|             Japan|      United States|  1548|
|     United States|            Germany|  1336|
|     United States|             Mexico|  7187|
|    United Kingdom|      United States|  2025|
|     United States|              Japan|  1496|
|     United States|     United Kingdom|  1970|
|     United States|             Canada|  8483|
|       South Korea|      United States|  1048|
|     United States|              India|    62|
+------------------+-------------------+------+



In [0]:
path="dbfs:/FileStore/flight-data/csv/2013_summary.csv"

In [0]:
df=spark.read.csv(path,header=True)
df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   12|
|       United States|            Croatia|    1|
|       United States|            Ireland|  266|
|               Egypt|      United States|   13|
|       United States|              India|   60|
|   Equatorial Guinea|      United States|    1|
|       United States|              Niger|    1|
|       United States|          Singapore|   22|
|       United States|            Grenada|   40|
|          Costa Rica|      United States|  509|
|             Senegal|      United States|   28|
|              Guyana|      United States|   34|
|       United States|       Sint Maarten|  260|
|       United States|   Marshall Islands|   33|
|             Bolivia|      United States|   33|
|            Anguilla|      United States|   22|
|       United States|           Paraguay|   15|
|             Algeri