## PySpark Commands

### Launch PySpark

#### Launch PySpark with Yarn

In [None]:
pyspark --master yarn
        
# In case you have two versions of Spark launch it with "pyspark2 --master yarn" for spark2.x version

#### Launch PySpark with Avro Package

In [None]:
pyspark --master yarn \
--packages org.apache.spark:spark-avro_2.11:2.4.0

#### Launch PySpark with Avro Package and MySQL connectivity

In [None]:
pyspark --master yarn \
--packages org.apache.spark:spark-avro_2.11:2.4.0 \
--jars /usr/share/java/mysql-connector-java.jar \
--driver-class-path /usr/share/java/mysql-connector-java.jar

### Create Spark Sessioin

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

### DataFrame Operations

#### Create data frame from RDD

In [None]:
df = spark.createDataFrame(rdd)

#### Structure of dataframe in summary format

In [None]:
df.describe()

#### Print columns and datatype of dataframe in tree format

In [None]:
df.printSchema()

#### Get number of records

In [None]:
df.count()

#### Preview records

In [None]:
df.show(n)  

# Pass the value of n to see the records or use df.show() default value is 20

# Another way is

df.show(5,False) 

# 5 is the value to dislay the number of rows
# False is to indicate truncate = False, this will display full content of column

#### Convert data into array

In [None]:
df.collect()

#### Register dataframe as temporary table and perform query against it

In [None]:
df.createTempView('view_name')                # This will create a Temporary View

df.createOrReplaceTempView('view_name')       # This will create and replace if Temporary View already exists 
 
# These views are temporary which has existance in hive, once the session is closed views will be gone

new_df = spark.sql('select * from view_name') # Perform queries against the created view

new_df.show()                                 # Display the results of query  


#### Change all column names

In [None]:
new_df = df.toDF('column_name_1', 'column_name_2')   # In this case data frame had two columns

new_df.printSchema()                                 # Verfiy the changes

#### Change specific column name

In [None]:
new_df = df.withColumnRenamed('old_column_name','new_column_name')

new_df.printSchema()                                 # Verfiy the changes

#### Add new column from old column

In [None]:
from pyspark.sql.functions import *                  # For some transformations we need to import the library

new_df = df.withColumn('new_column', substring(df.old_column, 1, 6)) # Here we added new column by transforming existing column

new_df.printSchema()                                 # Verfiy the changes in structure of data frame

new_df.show()                                        # Confirm the records by displaying

#### Change datatype of column

In [None]:
new_df = df.withColumn('column_name', df.column_name.cast('int'))  # Changing column data type into integer

#Or

from pyspark.sql.types import IntegerType
new_df = dataframe.withColumn("column_name", dataframe.column_name.cast(IntegerType()))

#Or

from pyspark.sql.types import IntegerType
newDF = dataframe.withColumn("column_name", dataframe.column_name.cast(IntegerType))

#### Configures the number of partitions when shuffling data for joins or aggregations

In [None]:
spark.conf.set("spark.sql.shuffle.partitions", "300")

#### Set file compression

In [None]:
spark.conf.set("spark.sql.parquet.compression.codec","snappy")  # This will set the compression for a spark session

spark.conf.set("spark.file.orc.codec","snappy")                 # This will set the compression for orc files
                                                                # Try replacing the file name in the conf for different files

### Date Formats

In [None]:
Sample Output                 MySQL                             Oracle                                 Spark

2013-02-14          DATE_FORMAT(NOW(), '%Y-%m-%d')      TO_CHAR(SYSDATE, 'YYYY-MM-DD')          DATE_FORMAT(NOW(), 'y-MM-dd')

14/02/13            DATE_FORMAT(NOW(), '%d/%m/%y'       TO_CHAR(SYSDATE, 'DD/MM/RR')            DATE_FORMAT(NOW(), 'dd/MM/yy')
                                
14-February-13      DATE_FORMAT(NOW(), '%d-%M-%y')      TO_CHAR(SYSDATE, 'DD-MONTH-RR')         DATE_FORMAT(NOW(), 'yyyy-MMMM-dd-E')
                                                                                                                    2011-March-22-Thu
14/02/13 15:35:22   DATE_FORMAT(NOW(), '%d/%m/%y %T')   TO_CHAR(SYSDATE, 'DD/MM/RR HH24:MI:SS') DATE_FORMAT(NOW(), 'dd/MM/yy HH:mm:ss')
                                                                                                                             hh - 12 hr
                                                                                                                             HH - 24 hr

#### Change date from bigint to date format (like if we import data through sqoop in avro file and create dataframe with that file then date is in bigint)

In [None]:
spark.sql('select to_date(from_unixtime(cast(date_column/1000 as int))) as column_name').show()

# Date alternative for this case
date_format(from_unixtime(cast(date_column/1000 as int)),'dd-MM-yyyy')