In [None]:
#https://colab.research.google.com/drive/1vkJbVcmsXAoNTpRvvwAK4fbA49bjS7He
# Imports
from pyspark.sql import SparkSession
from pyspark.sql.function import *
from pyspark.sql.window import Window
from pyspark.sql.types import *

# Create SparkSession
spark = SparkSession.builder \
            .appName('SparkByExamples.com') \
            .getOrCreate()

In [None]:
!ls -lrt
!head tickets.dat

In [None]:
# load data
df1=spark.read.csv(dat_file,sep="|",inferSchema=True,header=True)
spark.read.json()
spark.read.text()
spar.read.load("parquet")

In [None]:
# inspect data
df.printSchema()
df.head()
df.first() # first row
df.distinct()
df.count()
df.distinct().count()

In [None]:
# filter
df.filter(df.age > 24).show()
df.filter(df.age.between(22,44)).show()
df.filter(col('languages').isin('S','A')).show()
# not in
df.filter(~col('languages').isin('S','A')).show()
# contain
df.filter(df.location.contains('google.com'))

In [None]:
# drop duplicate
df = df.dropDuplicates()

In [None]:
# add columns value
df.withColumn('total', 'mark1'+'mark2'+'mark3')

In [None]:
# select
df.select('mark1', 'mark2').show()
df.select(df.age, df['age']+1)

# when (numerical) like case when in sql
df.select(when(df.age>3,1).when(df.age>5,2).otherwise(3)).show()
cols = ('age','name')
df = df.withColumn('bucket', when(df.age>3,1).when(df.age>5,2).otherwise(3)).drop(*cols)

# startswith / endswith
df.select(col('firstName').startswith('Sm')).show()
df.select(col('firstName').endswith('ith')).show()

# substring
df.select(df.firstname.substr(1,3).alias('short'))

In [None]:
# join df
df.join(df2, df.name == df2.name, how = inner)

In [None]:
# union df
df.union(df2)

In [None]:
# explode nested array
df.select(df.name, explode(df.subjects)).show(truncate=False)
# flatten to nested array
df.select(df.name, flatten(df.subjects)).show(truncate=False)

In [None]:
# rename column
df = df.withColumnRenamed('Mark2', 'Mark22')

In [None]:
# missing value
df.na.fill(50)
df.na.drop()

In [None]:
# replace
df.na.replace(10,20,col)
df.na.replace(['Tom','Amy'], ['T','A'], 'name').show()
# regrex replace
df.withColumn('address', 
    when(df.address.endswith('Rd'),regexp_replace(df.address,'Rd','Road')).show()

In [None]:
# group by
df.groupby('age').agg(count('*').alias('count')).show()

In [None]:
# partition by order by
windowSpec  = Window.partitionBy("department").orderBy("salary")
df.withColumn("lag",lag("salary",2).over(windowSpec)).show()
# window function
row_number().over(windowSpec)
rank().over(windowSpec)
dense_rank().over()
percent_rank().over()
lag('age',3).over()
lead('age',3).over()
# ntile return relative rank value in that selected interval
ntil(2).over(partitionBy('department').orderBy('salary'))

In [None]:
# sort
df.sort('age', ascending = 'False')

In [None]:
# sql query
df.createOrReplaceTempView("customer")
df5 = spark.sql("SELECT * FROM customer").show()

In [None]:
# convert to different data struct
df.toPandas()
df.toJSON()
df.rdd

In [None]:
# write & save
df.select('age').write.save('age.parquet')
df.select('age').write.save('age.json', format = 'jason')

In [None]:
# handling sentence
df.writeColumn('word_count', size(split('sentence'),' '))

In [None]:
# datetime
df.select(
    year("datetime").alias('year'), 
    month("datetime").alias('month'), 
    dayofmonth("datetime").alias('day')
).show()