In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import array,struct,lit,udf,concat,col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import avg, round,sum,max,min
spark = SparkSession.builder.appName("TuanKiet").getOrCreate()
sc = spark.sparkContext
salaries_df = spark.read.csv("salaries.csv", header=True, inferSchema=True)

In [44]:
data = sc.parallelize([(1,'mahher',500,1000),
        (2,'wafa',4000,1000),
        (3,'kita',5000,1000),
        (4,'kuta',500,1000)])
schema = StructType([StructField("age",IntegerType()),
                     StructField("education_num",StringType(),nullable=False),
                     StructField("marital_status",IntegerType()),
                     StructField("occupation",IntegerType())
                    ])
df = spark.createDataFrame(data=data,schema=schema)
salaries_df.show(5)

+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+------+
|work_year|experience_level|employment_type|           job_title|salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|scores|
+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+------+
|     2020|              EN|             FT| Azure Data Engineer|100000|            USD|       100000|                MU|           0|              MU|           S|     0|
|     2020|              EN|             CT|  Staff Data Analyst| 60000|            CAD|        44753|                CA|          50|              CA|           L|     0|
|     2020|              SE|             FT|Staff Data Scientist|164000|            USD|       164000|                US|          50|      

# groupBy

In [13]:
CA_jobs = salaries_df.where(salaries_df['company_location'] == "CA").where(salaries_df['experience_level']== "EN").groupBy().avg("salary_in_usd")
CA_jobs.show()

+------------------+
|avg(salary_in_usd)|
+------------------+
| 97330.53932584269|
+------------------+



In [39]:
(
    salaries_df
        .where(salaries_df['remote_ratio'] >= 50)
        .groupBy('company_location')
        .agg(round(min('remote_ratio'), 0).alias('avg_remote_ratio'))
        .show(5)
)

+----------------+----------------+
|company_location|avg_remote_ratio|
+----------------+----------------+
|              DZ|              50|
|              FI|              50|
|              UA|              50|
|              RO|              50|
|              NL|              50|
+----------------+----------------+
only showing top 5 rows


# Udf

In [50]:
def plusOne(x):
    return x + 1
plusOne_udf = udf(plusOne,IntegerType())
salaries_df.withColumn('TEST',plusOne_udf(col('remote_ratio'))).show(5)

+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+------+----+
|work_year|experience_level|employment_type|           job_title|salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|scores|TEST|
+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+------+----+
|     2020|              EN|             FT| Azure Data Engineer|100000|            USD|       100000|                MU|           0|              MU|           S|     0|   1|
|     2020|              EN|             CT|  Staff Data Analyst| 60000|            CAD|        44753|                CA|          50|              CA|           L|     0|  51|
|     2020|              SE|             FT|Staff Data Scientist|164000|            USD|       164000|             

In [40]:
schema = StructType([StructField("age",IntegerType()),
                     StructField("education_num",IntegerType(),nullable=False),
                     StructField("marital_status",StringType()),
                     StructField("occupation",StringType()),
                     StructField("income",StringType()),
                    ])
census_adult = spark.read.csv("salaries.csv", sep=',', header=True, schema=schema)
census_adult.show(5)

+----+-------------+--------------+--------------------+------+
| age|education_num|marital_status|          occupation|income|
+----+-------------+--------------+--------------------+------+
|2020|         NULL|            FT| Azure Data Engineer|100000|
|2020|         NULL|            CT|  Staff Data Analyst| 60000|
|2020|         NULL|            FT|Staff Data Scientist|164000|
|2020|         NULL|            FT|        Data Analyst| 42000|
|2020|         NULL|            FT|      Data Scientist|300000|
+----+-------------+--------------+--------------------+------+
only showing top 5 rows


In [66]:
census_adult.drop('education_num').show()

+----+--------------+--------------------+------+
| age|marital_status|          occupation|income|
+----+--------------+--------------------+------+
|2020|            FT| Azure Data Engineer|100000|
|2020|            CT|  Staff Data Analyst| 60000|
|2020|            FT|Staff Data Scientist|164000|
|2020|            FT|        Data Analyst| 42000|
|2020|            FT|      Data Scientist|300000|
|2020|            CT|  Sales Data Analyst| 60000|
|2020|            FT|  Staff Data Analyst| 15000|
|2020|            FT|Business Data Ana...| 95000|
|2020|            FT|        Data Analyst| 20000|
|2020|            FT|      Data Scientist| 43200|
|2020|            FT|Machine Learning ...|157000|
|2020|            FT|       Data Engineer| 48000|
|2020|            FT|Product Data Analyst| 20000|
|2020|            FT|       Data Engineer| 51999|
|2020|            FT|   Big Data Engineer| 70000|
|2020|            FT|      Data Scientist| 60000|
|2020|            FT|  Research Scientist|450000|


In [67]:
census_adult.createOrReplaceTempView('census')
result = spark.sql("SELECT age,occupation FROM census")
result.show()

+----+--------------------+
| age|          occupation|
+----+--------------------+
|2020| Azure Data Engineer|
|2020|  Staff Data Analyst|
|2020|Staff Data Scientist|
|2020|        Data Analyst|
|2020|      Data Scientist|
|2020|  Sales Data Analyst|
|2020|  Staff Data Analyst|
|2020|Business Data Ana...|
|2020|        Data Analyst|
|2020|      Data Scientist|
|2020|Machine Learning ...|
|2020|       Data Engineer|
|2020|Product Data Analyst|
|2020|       Data Engineer|
|2020|   Big Data Engineer|
|2020|      Data Scientist|
|2020|  Research Scientist|
|2020|        Data Analyst|
|2020|       Data Engineer|
|2020|      Data Scientist|
+----+--------------------+
only showing top 20 rows


In [32]:
result.withColumn('bonus',result.occupation*0.1).show()

+---+----------+-----+
|age|occupation|bonus|
+---+----------+-----+
|  1|      1000|100.0|
|  2|      1000|100.0|
|  3|      1000|100.0|
|  4|      1000|100.0|
+---+----------+-----+



In [41]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

spark = SparkSession.builder.appName("UDF Example").getOrCreate()

data = [("kiệt",), ("tuấn",), ("kiet",)]
df = spark.createDataFrame(data, ["name"])

def to_upper(x):
    return x + 'A'

to_upper_udf = udf(to_upper, StringType())

salaries_df = salaries_df.withColumn("NAME_UPA", to_upper_udf(salaries_df.scores))
salaries_df.show()


+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+------+-------+--------+
|work_year|experience_level|employment_type|           job_title|salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|scores|NAME_UP|NAME_UPA|
+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+------+-------+--------+
|     2020|              EN|             FT| Azure Data Engineer|100000|            USD|       100000|                MU|           0|              MU|           S|Values|    MUA| ValuesA|
|     2020|              EN|             CT|  Staff Data Analyst| 60000|            CAD|        44753|                CA|          50|              CA|           L|Values|    CAA| ValuesA|
|     2020|              SE|             FT|Staff Data 

In [40]:
salaries_df.printSchema()

root
 |-- work_year: integer (nullable = true)
 |-- experience_level: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- salary_currency: string (nullable = true)
 |-- salary_in_usd: integer (nullable = true)
 |-- employee_residence: string (nullable = true)
 |-- remote_ratio: integer (nullable = true)
 |-- company_location: string (nullable = true)
 |-- company_size: string (nullable = true)
 |-- scores: string (nullable = false)
 |-- NAME_UP: string (nullable = true)



In [44]:
def plusA(s):
    return s + 'A'
plusA_udf = udf(plusA,StringType())
salaries_df = salaries_df.withColumn("NAME_UPB", plusA_udf(salaries_df['scores']))

In [None]:
result = salaries_df.filter(salaries_df.salary > 10000).filter(salaries_df.remote_ratio > 70)
result.show()

+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+------+-------+--------+--------+
|work_year|experience_level|employment_type|           job_title|salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|scores|NAME_UP|NAME_UPA|NAME_UPB|
+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+------+-------+--------+--------+
|     2020|              EX|             FT|      Data Scientist|300000|            USD|       300000|                US|         100|              US|           L|Values|    USA| ValuesA| ValuesA|
|     2020|              EN|             FT|        Data Analyst| 20000|            EUR|        22809|                PT|         100|              PT|           M|Values|    PTA| ValuesA| ValuesA|
|     2020