In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.types import ArrayType, FloatType, BooleanType, DateType
from pyspark.sql.functions import col, expr
from pyspark.sql.functions import count, desc
from pyspark.sql.functions import year

spark = (SparkSession.builder.appName("TotalOrder").getOrCreate())


#Dataframe from JSON files + basic data operations

In [None]:
#Dataframe from JSON files + basic data operations

schema_person = StructType([
                            StructField("id", IntegerType(), True), # True - means that this field could have an empty value
                            StructField("first_name", StringType(), True),
                            StructField("last_name", StringType(), True),
                            StructField("fav_movies", ArrayType(StringType()), True), # for nested objects, where one filed has more that one values (2-3, etc.)
                            StructField("salary", FloatType(), True),
                            StructField("image_url", StringType(), True),
                            StructField("date_of_birth", DateType(), True),
                            StructField("active", BooleanType(), True)
                            ])

In [None]:
# only for GColab 
from google.colab import files
uploaded = files.upload()

In [None]:
person_df = (spark.read.json('persons.json', schema_person, multiLine = "True"))

In [None]:
# column and expressions

person_df.select(col("first_name"), col("last_name")).show(5) # select two columns
person_df.select(expr("first_name"), expr("last_name")).show(5) - same result but with another method

In [None]:
from pyspark.sql.functions import concat_ws

#concat column and perform operation over the column

(person_df.select(concat_ws(" ", col("first_name"), col("last_name")).alias("full_name"),
                  col("salary"), 
                  (col('salary') * 0.1 + col("salary")).alias("salary_increase"))).show(10)
# first parameter for concat_ws " " - is a space between two names from columns
# create new column that will show 10% increase of column "salary"
#could be changed to: expr("salary * 0.1 + salary")

In [None]:
#Filter and Where condition

person_df.filter('salary < 3000') # salary lower than 3000
person_df.where('salary < 3000') # filter and where - are the same thing

#combine two conditions with AND
person_df.where((col("salary") < 3000) & (col("active") == True))

#filter by year (date)
from pyspark.sql.functions import year
person_df.filter((year('date_of_birth') == 2000) | (year('date_of_birth') == 1991))

#filter by list (nested) values
from pyspark.sql.functions import array_contains
person_df.where(array_contains(person_df.fav_movies, 'needed_value'))

In [None]:
# Distinct, Drop duplicates, Order by

from pyspark.sql.functions import count, desc

person_df.select('active').distinct()

#order by column year first - than by column "first_name"
(person_df.select(col("first_name"),
                  year(col("date_of_birth")).alias("year")).orderBy('year', 'first_name'))

#drop duplicates
(person_df.select(col("first_name"),
                  year(col("date_of_birth")).alias("year")).orderBy('year', 'first_name')).dropDuplicates(['first_name'])

# orderBy in ascending
(person_df.select(col("first_name"),
                  year(col("date_of_birth")).alias("year")).orderBy('year', ascending=False)).show(10)


In [None]:
# Rows and Union

from pyspark.sql import Row

#create df from list of Rows - Schema provided by Spark
persons_row_list = [Row(101, "Mike", "Grays", ["Gladiator", "Shawshank Redemption", "Star Wars: Return of the Jedi"], 4520.65, "http/someimage.com", "2000-06-12", True),
                    Row(103, "Walberg", "Beckon", ["Gladiator", "Shawshank Redemption", "Star Wars: Return of the Jedi"], 4520.90, "http/someimage.com", "2000-06-12", True)] 

new_df_persons = spark.createDataFrame(persons_row_list, ["id", "first_name", "last_name", "fav_movie", "salary", "image_url", "date_of_birth", "active_status"])

#combine two dataframes

combined_persons = person_df.union(new_df_persons)
combined_persons.sort(desc('id')).show(10)

In [None]:
# Add, Rename, Drop columns

from pyspark.sql.functions import round

#add a new column with a new value calculated > expr()
updated_person_df = person_df.withColumn("salary increase", expr("salary * 0.1 + salary"))
updated_person_df

# add, rename and round values in a column + Drop the unwanted column

updated_person_df_2 = updated_person_df.withColumn('birth_year', year("date_of_birth")).withColumnRenamed('fav_movies', 'movies').withColumn("salary_round", round(col('salary increase'), 2)).drop('salary increase').show(10)

In [None]:
from google.colab import files
uploaded = files.upload()

In [None]:
# create Spark Dataframe from CSV


schema_new_df_2 = StructType([
                            StructField("Order ID", StringType(), True), #True - means that this field could have an empty value
                            StructField("Product", StringType(), True),
                            StructField("Quantity Ordered", StringType(), True),
                            StructField("Price Each", StringType(), True),
                            StructField("Order Date", StringType(), True),
                            StructField("Purchase Address", StringType(), True)
                            ])

salesdata_df = spark.read.csv('salesdata.csv', schema = schema_new_df_2, header =True)
salesdata_df.show(10)

In [None]:
fire_df = spark.read.csv('new_fire.csv',inferSchema=True, header =True)

fire_df.show(10) # show 10 rows example
fire_df.printSchema() # print Schema
fire_df.columns # print columns

fire_df_new = fire_df.limit(10) # select n number of rows
fire_df_new.write.csv("fire_df_new.csv") # save into csv file

from google.colab import files
files.download('fire_df_new.csv') #download the file

In [None]:
# RDD
# count the total number of list values

data2001List = ['RIN1', 'RIN2', 'RIN3', 'RIN4', 'RIN5', 'RIN6', 'RIN7']
data2002List = ['RIN3', 'RIN4', 'RIN7', 'RIN8', 'RIN9']
data2003List = ['RIN4', 'RIN8', 'RIN10', 'RIN11', 'RIN12']

data2001RDD = spark.sparkContext.parallelize(data2001List) #creating RDD objects
data2002RDD = spark.sparkContext.parallelize(data2002List)
data2003RDD = spark.sparkContext.parallelize(data2003List)

RDD_2001_2002 = data2001RDD.union(data2002RDD) 
RDD_2001_2003 = RDD_2001_2002.union(data2003RDD).distinct() #RDD union oblect
research_data = RDD_2001_2003.collect() # list object
project_count = RDD_2001_2003.count() #count applied to RDD object

#substract set1 from set2

first_year_rdd = data2001RDD.subtract(data2002RDD)

In [None]:
# convert from F to C

temperature_day = [("day1", 59), ("day2", 57.2), ("day3", 53.6), ("day4", 55.4)]

temperature_rdd = spark.sparkContext.parallelize(temperature_day)

def convert_temperature(f):
  c = (f - 32) * 5/9
  return c

temperature_result = temperature_rdd.map(lambda f: convert_temperature(f[1])).collect()

In [None]:
# RDD: count, split

words_list = "this is really difficut to find a new project. really".split(" ")
words_rdd = spark.sparkContext.parallelize(words_list)
words_data = words_rdd.collect()
words_rdd.count()

#count only unique values and save it in a new object
words_rdd.distinct().count()
no_duplicate_words = words_rdd.distinct()

#filter out words that starts with needed letter ("t")
def check_the_word_start(word, letter):
  return word.startswith(letter)

words_rdd.filter(lambda word: check_the_word_start(word, "t")).collect()

In [None]:
#sort by key and value
countries_list = [("Canada", 51), ("Ukraine", 12), ("Poland", 25)]

countries_rdd = spark.sparkContext.parallelize(countries_list)

sort_countries_by_key = countries_rdd.sortByKey().collect()

sort_countries_by_value = countries_rdd.map(lambda c: (c[1], c[0])).sortByKey(False).collect() # False for descending order


In [None]:
#sum up the list value

list_values = [1,5,2,3,4]

list_count = spark.sparkContext.parallelize(list_values).reduce(lambda x, y: x + y)
print(list_count)