In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import count

spark = (SparkSession.builder.getOrCreate())

In [2]:
from pyspark.sql.types import StringType , IntegerType , StructField , StructType 

In [6]:
schema = StructType([StructField('Author' , StringType() , False),
                    StructField('Title' , StringType() , False),
                    StructField('PagesNum' , IntegerType() , False)
                    ])

In [7]:
#Another method for creating a database schema : DDL method
# schema = 'Author STRING NULLABLE=false , Title STRING , PagesNum INT'

In [12]:
myschema = " id INT ,First STRING  ,Last STRING , URL STRING , Published STRING ,hits INT"
# data = list of lists
data = [[1, 'ahmed'  , 'samy' , 'ahmed.samy@x.com'  , 'my name is ahmed',254],
        [2, 'hossam' , 'samy' , 'hossam.samy@y.com' , 'my name is hossam',879],
        [3, 'Ali'    , 'samy' , 'ali.samy@z.com'    , 'my name is ali',452]]
df = spark.createDataFrame(data , myschema)
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- First: string (nullable = true)
 |-- Last: string (nullable = true)
 |-- URL: string (nullable = true)
 |-- Published: string (nullable = true)
 |-- hits: integer (nullable = true)



In [13]:
df.show()

+---+------+----+-----------------+-----------------+----+
| id| First|Last|              URL|        Published|hits|
+---+------+----+-----------------+-----------------+----+
|  1| ahmed|samy| ahmed.samy@x.com| my name is ahmed| 254|
|  2|hossam|samy|hossam.samy@y.com|my name is hossam| 879|
|  3|   Ali|samy|   ali.samy@z.com|   my name is ali| 452|
+---+------+----+-----------------+-----------------+----+



In [14]:
from pyspark.sql.functions import *
df.select(expr('hits * 2').alias('hits Squared')).show()
# (truncate =False): show all content when show record in df

+------------+
|hits Squared|
+------------+
|         508|
|        1758|
|         904|
+------------+



In [16]:
df=spark.read.csv('NullData.csv' ,header = True , inferSchema =True)
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [17]:
df.na.drop(thresh=3).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [18]:
df.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [19]:
df.na.drop(thresh=1).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [21]:
df.na.drop(subset='Sales').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [23]:
df.na.drop(subset=['Name','Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [31]:
def fill_with_mean(df, exclude=set()): 
    stats = df.agg(*(
        avg(c).alias(c) for c in df.columns if c not in exclude
    ))
    return df.na.fill(stats.first().asDict())

fill_with_mean(df, ["Id", "Name"]).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [34]:
datapath= 'sf-fire-calls.csv'
df_fire=spark.read.csv(datapath ,header = True , inferSchema =True)
df_fire.show(3)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+------------

In [35]:
df_fire.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [40]:
df_fire.select(['Neighborhood','Location']).where(col('Neighborhood') != 'Mission' ).show(truncate=False)

+------------------------------+-------------------------------------+
|Neighborhood                  |Location                             |
+------------------------------+-------------------------------------+
|Pacific Heights               |(37.7895840679362, -122.428071912459)|
|Bayview Hunters Point         |(37.7337623673897, -122.396113802632)|
|Tenderloin                    |(37.7811772186856, -122.411699931232)|
|Bernal Heights                |(37.7388432849018, -122.423948785199)|
|Western Addition              |(37.7872890372638, -122.424236212664)|
|Financial District/South Beach|(37.7886866619654, -122.392722833778)|
|Oceanview/Merced/Ingleside    |(37.7140353531157, -122.454117149916)|
|Tenderloin                    |(37.7826266328595, -122.41915582123) |
|Japantown                     |(37.784958590666, -122.431435274503) |
|Castro/Upper Market           |(37.7618954753708, -122.437298717721)|
|Excelsior                     |(37.7105545807996, -122.443335369545)|
|Nob H

In [46]:
airline_data= 'departuredelays.csv'
df_air=spark.read.csv(airline_data ,header = True , inferSchema =True)
df_air.show(3)

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
|1021245|   -2|     602|   ABE|        ATL|
+-------+-----+--------+------+-----------+
only showing top 3 rows



In [49]:
# it's benefit that you can use a sql code like normal way. 
df_air.createOrReplaceTempView('us_flight_delay')
spark.sql("""select * from us_flight_delay 
              where delay=6 """).show()

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1311525|    6|     137|   ABI|        DFW|
|1090700|    6|     137|   ABI|        DFW|
|1181400|    6|     137|   ABI|        DFW|
|1241520|    6|     494|   ABQ|        DFW|
|1021326|    6|    1103|   ABQ|        ATL|
|1031326|    6|    1103|   ABQ|        ATL|
|1031440|    6|     852|   ABQ|        MSP|
|1310545|    6|     589|   ABQ|        LAX|
|1231635|    6|     589|   ABQ|        LAX|
|1060955|    6|     504|   ABQ|        DAL|
|1070830|    6|     660|   ABQ|        HOU|
|1081305|    6|     423|   ABQ|        LAS|
|1081915|    6|     504|   ABQ|        DAL|
|1091620|    6|     285|   ABQ|        PHX|
|1091155|    6|     589|   ABQ|        LAX|
|1101415|    6|     660|   ABQ|        HOU|
|1101315|    6|     285|   ABQ|        PHX|
|1110935|    6|     660|   ABQ|        HOU|
|1130920|    6|     285|   ABQ| 

In [None]:
# standard way for reading df file(csv , json , xpath , parquet)
# .schema(schema)
#new_path = 'C:\Users\SAMY\Desktop\Spark\Day2'
#df = spark.read.format('csv').option('header','True').option('inferSchema' ,'True').load(airline_data)
#df.write.format(parquet).mode('overwrite').save()