In [None]:
# pip install pyspark
# pip install findspark 
# pip show pyspark

In [None]:
#import findspark
#findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("lab").getOrCreate()

In [None]:
print(f'The PySpark {spark.version} version is running...')

In [None]:
from pyspark.sql.functions import col,column,expr,lit, instr, pow, round, bround
from pyspark.sql import Row 
from pyspark.sql.types import StructType,StructField,StringType,LongType

In [None]:
# To Spark (in Python or R), there is no such thing as a Dataset. Everything is a DataFrame 
spark.range(2)

In [None]:
spark.range(2).collect()

In [None]:
df = spark.read \
        .format("json") \
        .load("/Applications/MAMP/htdocs/Spark-The-Definitive-Guide/data/flight-data/json/2015-summary.json")

In [None]:
df

In [None]:
df.schema

In [None]:
df.printSchema()

In [None]:
listOfRowObjects = df.collect()

In [None]:
type(listOfRowObjects)

In [None]:
len(listOfRowObjects)

In [None]:
type(listOfRowObjects[0])

In [None]:
df = spark.createDataFrame([(1, (2,2))], ["a", "b"])
df.show()
df.printSchema(1)
df.printSchema(2)

In [None]:
df

In [None]:
df.schema

In [None]:
from pyspark.sql.functions import col,column,expr

In [None]:
df.a

In [None]:
df['a']

In [None]:
df.select(df.a).show()

In [None]:
df.select(df['a']).show()

In [None]:
df.select(col('a')).show()

In [None]:
# An expression created via the `expr` function is just a DataFrame column reference
# In the simplest case, expr("someCol") is equivalent to col("someCol")
expr('a')

In [None]:
# All the below are same! 
print(expr('a - 5'))
print(col('a') - 5)
print(expr('a') -5)

In [None]:
df.columns

### Records and Rows

In [None]:
df.first()

### Creating Rows

In [None]:
from pyspark.sql import Row 
myRow = Row("Hello", None, 1, False)

In [None]:
myRow

In [None]:
df.createOrReplaceTempView("dfTable")

In [None]:
spark.sql("select * from dfTable").show(10)

### Creating DataFrames

In [None]:
from pyspark.sql import Row
from pyspark.sql.types import StructType,StructField,StringType,LongType
myManualSchema = StructType([
    StructField('first_name', StringType(), True),
    StructField('last_name', StringType(), True),
    StructField('age', LongType(), True)
])

alice = Row("Alice", "Henderson", 25)
bob = Row("Bob", "Sanders", 28)
spark.createDataFrame([alice, bob], myManualSchema).show()

In [None]:
# pip install sparksql-magic
# Load the extension
%load_ext sparksql_magic

In [None]:
%%sparksql
select * from dfTable limit 250

### select and selectExpr

In [None]:
df.select('DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME').show(2, False)

In [None]:
from pyspark.sql.functions import expr, col, column
df.select(
    expr('DEST_COUNTRY_NAME'),
    col('DEST_COUNTRY_NAME'),
    column('DEST_COUNTRY_NAME')).show(2)

In [None]:
df.select(col('DEST_COUNTRY_NAME'), 'DEST_COUNTRY_NAME')

In [None]:
df.select(expr('DEST_COUNTRY_NAME as destination')).show(2)

In [None]:
df.select(expr('DEST_COUNTRY_NAME as destination').alias('alias_destination')).show(2)

In [None]:
df.selectExpr('DEST_COUNTRY_NAME as destination', 'DEST_COUNTRY_NAME').show(2)

In [None]:
df.selectExpr('*', '(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withInCountry').show(10)

In [None]:
# Aggregation over the entire DataFrame
df.selectExpr('avg(count)','count(distinct(DEST_COUNTRY_NAME))').show(2)

### Converting to Spark Types (Literals)

In [None]:
from pyspark.sql.functions import lit
df.select('*',lit(1).alias('one')).show(2)

In [None]:
df.select(expr('*'),lit(1).alias('one')).show(2)

### Adding Columns

In [None]:
df.withColumn('one', lit(1)).show(2)

In [None]:
# Another way to add a column!
df.withColumn('withInCountry', expr('DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME')).show(2)

In [None]:
# Another way to rename a column!
df.withColumn('destination',expr('DEST_COUNTRY_NAME')).show(2)

In [None]:
df.withColumnRenamed('DEST_COUNTRY_NAME','destination').columns

In [None]:
dfWithLongColName = df.withColumn('Col Name With Space',expr('DEST_COUNTRY_NAME'))

In [None]:
dfWithLongColName.show(2)

In [None]:
dfWithLongColName.selectExpr('`Col Name With Space`', '`Col Name With Space` as `new col`').show(2)

In [None]:
dfWithLongColName.select(col('Col Name With Space')).show(2)

In [None]:
# We need to escape expressions that use reserved characters/keywords
dfWithLongColName.select(expr('`Col Name With Space`')).show(2)

In [None]:
df.printSchema

In [None]:
df.withColumn('count2', col('count').cast('int')).printSchema

In [None]:
# df.filter(col('count') < 2).show(2)
# df.filter(expr('count') < 2).show(2)
df.filter(expr('count < 2')).show(2)

In [None]:
df.where('count < 2').show(2)

In [None]:
df.where(col('count') < 2).show(2)

In [None]:
df.where(col('count') < 2)\
    .where("ORIGIN_COUNTRY_NAME != 'Croatia'")\
    .show(2)

In [None]:
df.where(col('count') < 2).where(col('ORIGIN_COUNTRY_NAME') != 'Croatia').show(2)

In [None]:
df.select('DEST_COUNTRY_NAME','ORIGIN_COUNTRY_NAME').distinct().count()

In [None]:
df.select('ORIGIN_COUNTRY_NAME').distinct().count()

### Random Samples

In [None]:
# sampling without replacement, in which a subset of the observations is selected randomly, and once an observation is selected it cannot be selected again. 
# sampling with replacement, in which a subset of observations are selected randomly, and an observation may be selected more than once.
df.sample(False,.9).count()

### Concatenating and Appending Rows (Union)

In [None]:
from pyspark.sql import Row

lastRow = Row('New Country', 'New Country', 1)
newDF = spark.createDataFrame([lastRow], df.schema)
df.union(newDF).where('DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME').show(300)

### Sorting Rows

In [None]:
df.sort('count').show(5)

In [None]:
df.orderBy('count','DEST_COUNTRY_NAME').show(5)

In [None]:
df.orderBy(col('count'), col('DEST_COUNTRY_NAME')).show(5)

In [None]:
df.orderBy(expr('count desc')).show(5)

In [None]:
df.orderBy(col('count').desc(), col('DEST_COUNTRY_NAME').asc()).show(5)

In [None]:
# For optimization purposes, it's sometimes advisable to sort within each partition before
# another set of transformations
spark.read \
        .format("json") \
        .load("/Applications/MAMP/htdocs/Spark-The-Definitive-Guide/data/flight-data/json/*-summary.json")\
        .sortWithinPartitions('count')

### Repartition & Coalesce
#### https://mrpowers.medium.com/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4

In [None]:
df.rdd.getNumPartitions()

In [None]:
df.repartition(5)

In [None]:
rangeDF=spark.range(10)

In [None]:
rangeDF.rdd.getNumPartitions()

In [None]:
rangeDF.write.csv('/Users/deepakagrawal/Desktop/data', 'overwrite')

In [None]:
increasedPartitionedDF=rangeDF.repartition(6)

In [None]:
increasedPartitionedDF.write.csv('/Users/deepakagrawal/Desktop/data', 'overwrite')

## Chapter 6

In [None]:

df = spark.read.format('csv') \
          .option('header', 'true') \
          .option('inferSchema','true') \
          .load("/Applications/MAMP/htdocs/Spark-The-Definitive-Guide/data/retail-data/by-day/2010-12-01.csv")

In [None]:
df.printSchema()

In [None]:
df.schema.fields

In [None]:
df.columns

In [None]:
df.describe()

In [None]:
df.createOrReplaceTempView('dfTable')

In [None]:
df.show(10, False)

In [None]:
df.select(lit(5), lit("five"), lit(5.0)).printSchema()

In [None]:
df.select(lit(5), lit("five"), lit(5.0)).schema

In [None]:
df.select(lit(5), lit("five"), lit(5.0)).show(10)

In [None]:
lit('first_name')

In [None]:
col('first_name')

In [None]:
column('first_name')

In [None]:
expr('first_name')

In [None]:
def spark_to_postgres_type(spark_type):
    if spark_type == 'string':
        return 'TEXT'
    elif spark_type == 'integer':
        return 'INTEGER'
    elif spark_type == 'long':
        return 'BIGINT'
    elif spark_type == 'float':
        return 'REAL'
    elif spark_type == 'double':
        return 'DOUBLE PRECISION'
    elif spark_type == 'boolean':
        return 'BOOLEAN'
    elif spark_type == 'date':
        return 'DATE'
    elif spark_type == 'timestamp':
        return 'TIMESTAMP'
    else:
        return 'TEXT'

table_name = 'my_table'
create_table_sql = f"CREATE TABLE {table_name} ("
for field in df.schema.fields:
    field_name = field.name
    field_type = field.dataType.typeName()
    postgres_field_type = spark_to_postgres_type(field_type)
    create_table_sql += f"{field_name} {postgres_field_type}, "

create_table_sql = create_table_sql[:-2] # Remove the last comma and the space
create_table_sql += ')'
print(create_table_sql)

In [None]:
df.where(col('InvoiceNo') != 536365) \
  .select('InvoiceNo','Description') \
  .show(5, False)

In [None]:
df.where('InvoiceNo = 536365').show(5, False)

In [None]:
df.where('InvoiceNo <> 536365').show(5, False)

In [None]:
priceFilter = col('UnitPrice') > 600
descripFilter = instr(df.Description, 'POSTAGE') >= 1
df.where(df.StockCode.isin('DOT')).where(priceFilter | descripFilter).show()

In [None]:
DOTCodeFilter = col('StockCode') == 'DOT'
priceFilter = col('UnitPrice') > 600
descripFilter = instr(df.Description, 'POSTAGE') >= 1
df.withColumn('isExpensive', DOTCodeFilter & (priceFilter | descripFilter)) \
  .where('isExpensive') \
  .select('UnitPrice', 'StockCode', 'Description',  'isExpensive').show()

In [None]:
# Express Filters as SQL statements 
df.withColumn('isExpensive', expr('NOT UnitPrice <= 250')) \
  .where('isExpensive') \
  .select('Description','UnitPrice').show()

In [None]:
# Working with null data when creating Boolean expressions
df.where(col('Description').eqNullSafe("hello")).show()

In [None]:
fabricatedQuantity = pow(col('Quantity') * col('UnitPrice'), 2) + 5
df.select(expr('CustomerID'), fabricatedQuantity.alias('realQuantity')).show()

In [None]:
# As SQL expression
df.selectExpr('CustomerID', '(POWER((Quantity * UnitPrice), 2) + 5) as realQuantity').show()

In [None]:
# Rounding
df.select(round(lit('2.5')), bround(lit('2.5'))).show(1)

In [None]:
# Compute Summary Statistics 
df.describe().show()

In [None]:
# Add a unique ID to each row
from pyspark.sql.functions import monotonically_increasing_id
df.select(monotonically_increasing_id()).show(5)

### Working With Strings

In [None]:
from pyspark.sql.functions import initcap
df.select(col('Description')).show(10, False)
df.select(initcap(col('Description'))).show(10, False)

In [None]:
from pyspark.sql.functions import lower, upper
df.select(col('Description'), \
          lower(col('Description')), \
          upper(lower(col('Description')))) \
  .show(5, False)

In [None]:
from pyspark.sql.functions import lit, ltrim, rtrim, trim, lpad, rpad
df.select(
    ltrim(lit('     HELLO      ')).alias('ltrim'),
    rtrim(lit('     HELLO      ')).alias('rtrim'),
    trim(lit('     HELLO      ')).alias('trim'),
    lpad(lit('HELLO'), 3, '$').alias('lpad'),
    rpad(lit('HELLO'), 10, '#').alias('rpad')
  ).show(1)

### Regular Expressions

In [None]:
from pyspark.sql.functions import regexp_replace

regex_string = 'BLACK|WHITE|RED|GREEN|BLUE'
df.select(
    col('Description'),
    regexp_replace(
        col('Description'), regex_string, '{COLOR}').alias('color_clean')
    ).show(10, False)

In [None]:
from pyspark.sql.functions import translate
df.select(
    col('Description'),
    translate(
        col('Description'), 'LET', '137')
    ).show(10, False)

In [None]:
# Pulling out the first mentioned color:
from pyspark.sql.functions import regexp_extract
extract_string = 'BLACK|WHITE|RED|GREEN|BLUE'
df.select(
    col('Description'),
    regexp_extract(
        col('Description'), extract_string, 0).alias('color_clean')
    ).show(10, False)

In [None]:
from pyspark.sql.functions import instr 
containsBlack = instr(col('Description'), 'BLACK') >=1
containsWhite = instr(col('Description'), 'WHITE') >=1
df.withColumn('hasSimpleColor', containsBlack | containsWhite) \
  .where('hasSimpleColor') \
  .select('Description') \
  .show(3, False)

In [None]:
from pyspark.sql.functions import expr, locate
simpleColors = ['black', 'white', 'red', 'green', 'blue']
def color_locator(column, color_string):
    return locate(color_string.upper(), column).cast('boolean').alias('is_' + color_string)
selectedColumns = [color_locator(df.Description, c) for c in simpleColors]
selectedColumns.append(expr('*'))
df.select(*selectedColumns).where(expr('is_white AND is_red')).select('Description').show(3, False)

### Working with Dates and Timestamps

In [None]:
from pyspark.sql.functions import current_date, current_timestamp
dateDF = spark.range(1).withColumn('today', current_date()).withColumn('now', current_timestamp())
dateDF.show(1, False)
dateDF.printSchema()

In [None]:
from pyspark.sql.functions import date_add, date_sub
dateDF.select(date_sub(col('today'), 5), date_add(col('today'), 5)).show(1, False)

In [None]:
# The name datediff is a shorthand derived from the words "date" and "difference"
from pyspark.sql.functions import datediff, months_between, to_date
dateDF.withColumn('week_ago', date_sub(col('today'), 7)) \
      .select(datediff(col('week_ago'), col('today'))).show(1)

In [None]:
dateDF.select(
    to_date(lit('2016-01-01')).alias('start'), 
    to_date(lit('2017-01-01')).alias('end')) \
    .select(months_between(col('start'), col('end'))).show(1)

In [None]:
spark.range(1).withColumn('date', lit('2017-01-01')).printSchema()

In [None]:
spark.range(1).withColumn('date', lit('2017-01-01')) \
     .select(to_date(col('date'))).printSchema()

In [None]:
# Spark will not throw an error if it can't parse the date; rather, it will just return null
dateDF.select(to_date(lit('2016-20-12')), to_date(lit('2016-12-20'))).show(1)

In [None]:
dateFormat = 'yyyy-dd-MM'
dateDF.select(to_date(lit('2016-20-12'), dateFormat), to_date(lit('2016-12-20'), dateFormat)).show(1)

In [None]:
dateFormat = 'yyyy-MM-dd'
dateDF.select(to_date(lit('2016-20-12'), dateFormat), to_date(lit('2016-12-20'), dateFormat)).show(1)

In [None]:
from pyspark.sql.functions import to_timestamp
dateFormat = 'yyyy-dd-MM'
dateDF.select(to_date(lit('2016-20-12'), dateFormat).alias('date')) \
      .select(to_timestamp(col('date'))).show()