In [2]:
from os.path import abspath

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions

spark = SparkSession \
    .builder.appName("eurowings assignment for data engineer") \
    .master("local[*]") \
    .getOrCreate()

In [3]:
dfSearches = spark.read \
    .format("json") \
    .option("pathGlobFilter","*.json") \
    .load("/DataLake/searches/")

print(dfSearches.count())
# assert dfSearches.count()==65973,"Problem in loading json files!"

65973


In [5]:
# change the structure to use dimention tables
dfSearches=dfSearches.withColumn("date",functions.to_date("date_time","yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
dfSearches=dfSearches.withColumn("flight_date_outbound2",functions.to_date("flight_date_outbound"))
dfSearches=dfSearches.withColumn("flight_date_inbound2",functions.to_date("flight_date_inbound"))
dfSearches.show()
dfSearches.printSchema()

+--------------------+---------------+---------------+-------------------+--------------------+----------+----------+--------+--------------------+----------+---------------------+--------------------+
|           date_time|destination_out|destination_ret|flight_date_inbound|flight_date_outbound|origin_out|origin_ret|segments|          visitor_id|      date|flight_date_outbound2|flight_date_inbound2|
+--------------------+---------------+---------------+-------------------+--------------------+----------+----------+--------+--------------------+----------+---------------------+--------------------+
|2021-04-08T01:08:...|            BDS|           null|               null|          2021-05-04|       STR|      null|       1|1.161998938283984...|2021-04-08|           2021-05-04|                null|
|2021-04-29T02:21:...|            HAM|           null|               null|          2021-04-30|       MUC|      null|       3|4.455176468832717E10|2021-04-29|           2021-04-30|            

In [6]:
# which one is better save in hdfs or save in local parquet files
# change mode form overwrite to # append
dfSearches.write \
    .mode("overwrite") \
    .format("parquet") \
    .save("searches")
    # .partitionBy("flight_date_outbound")
    # .bucketBy(42,"visitor_id") \
    # .sortBy("date_time") \
print("write succeed.")

write succeed.


In [7]:
dfVisitors = spark.read \
    .format("json") \
    .option("pathGlobFilter","*.json") \
    .load("/DataLake/visitors/")

print(dfVisitors.count())

69993


In [8]:
# cleanse and change dataframe to using dimension tables
dfVisitors=dfVisitors.withColumn("date", functions.when(functions.length("visit_start")==19, functions.to_date("visit_start",'yyyy-MM-dd HH:mm:ss') ).otherwise(functions.to_date("visit_start",'yy-MM-dd HH:mm:ss')))
dfVisitors.show()
dfVisitors.printSchema()

+-----------+-------+-------------------+------------------+---------+------+----------+-------------------+--------------------+------+----------+
|countPerday|country| first_hit_pagename|          hits_avg|logged_in|region|registered|        visit_start|          visitor_id|visits|      date|
+-----------+-------+-------------------+------------------+---------+------+----------+-------------------+--------------------+------+----------+
|          1|    deu|             Select|      6.1710671605|        0|    he|     false|2021-03-22 00:00:00|2.541229877392200...|     1|2021-03-22|
|          2|    aut|               null|      6.1710671605|        0|     9|     false|2021-03-22 00:00:01|4.024411422207301E10|     1|2021-03-22|
|          3|    usa|           Français|      2.4684268642|        0|    ny|     false|2021-03-22 00:00:02|2.366994293358576...|     1|2021-03-22|
|          4|    deu|            Deutsch|      2.4684268642|        0|    nw|     false|2021-03-22 00:00:05|4.06

In [9]:
dfVisitors.write \
    .format("parquet") \
    .mode("overwrite") \
    .save("visitors")
    # .partitionBy("country") \
print("visitors save succeed.")

visitors save succeed.


In [16]:
spark.sql("select * from parquet.searches").printSchema()
spark.sql("select * from parquet.visitors").printSchema()

root
 |-- date_time: string (nullable = true)
 |-- destination_out: string (nullable = true)
 |-- destination_ret: string (nullable = true)
 |-- flight_date_inbound: string (nullable = true)
 |-- flight_date_outbound: string (nullable = true)
 |-- origin_out: string (nullable = true)
 |-- origin_ret: string (nullable = true)
 |-- segments: long (nullable = true)
 |-- visitor_id: double (nullable = true)
 |-- date: date (nullable = true)
 |-- flight_date_outbound2: date (nullable = true)
 |-- flight_date_inbound2: date (nullable = true)

root
 |-- countPerday: long (nullable = true)
 |-- country: string (nullable = true)
 |-- first_hit_pagename: string (nullable = true)
 |-- hits_avg: double (nullable = true)
 |-- logged_in: long (nullable = true)
 |-- region: string (nullable = true)
 |-- registered: boolean (nullable = true)
 |-- visit_start: string (nullable = true)
 |-- visitor_id: double (nullable = true)
 |-- visits: long (nullable = true)
 |-- date: date (nullable = true)



In [6]:
## Task3: Reports
# For this task we would like to have a simple report which shows number of searches
# per region, country and date (not date time). For this report you need, first
# to join the datasets. To make the join faster and more accurate, we assume
# that the region of a visitor in visitors dataset will not change on daily basis. Then,
# you should first get the latest entry of each visitor per day and later perform the join
# with searches dataset

dfVisitorsDaily = spark.sql("select visitor_id,date"
                            ",count(*) as count"
                            ",min(country) as country"
                            ",min(region) as region "
                            ",sum(countPerday) as countPerday"
                            ",avg(hits_avg) as hits_avg"
                            ",sum(visits) as visits"
                            " from parquet.visitors"
                            " group by visitor_id,date")
dfVisitorsDaily.show()

dfSearchesDaily = spark.sql("select visitor_id,date"
                            ",count(*) as count"
                            " from parquet.searches"
                            " group by visitor_id,date")
dfSearchesDaily.show()

+------------------+----------+-----+-------+------+-----------+------------------+------+
|        visitor_id|      date|count|country|region|countPerday|          hits_avg|visits|
+------------------+----------+-----+-------+------+-----------+------------------+------+
|              null|2003-05-21| 2393|    alb|    00|   12010995|29.132697748503144|  2857|
|              null|2012-05-21| 2528|    alb|    00|   12514875|  11.8187653339703|  3519|
|              null|2015-02-21| 2538|    alb|    00|   12782607| 7.536093600178818|  2793|
|              null|2022-03-21| 2485|    alb|    00|   12484186| 8.071805512470572|  2882|
|              null|2024-04-21| 2461|    alb|    02|   12218164| 9.381225705348516|  2938|
|              null|2027-01-21| 2502|    alb|    01|   12446348| 9.532350264548576|  2747|
|              null|2029-04-21| 2525|    are|    02|   12784393| 12.55378359470276|  3994|
|35709.209143347456|2021-01-27|    1|    srb|    25|       3573|     35.7921895309|     1|

In [None]:
dfReport = spark.sql("select s.date"
                     ",min(country) as country"
                     ",min(region) as region "
                     ",count(*) as count"
                     " from parquet.searches s"
                     " join parquet.visitors v on (v.visitor_id=s.visitor_id and v.date=s.date) "
                     " group by s.date")
dfReport.show()

In [22]:
dfReport2 = spark.sql("select date,country,region,sum(search_count) search_count"
                      " from (select v.visitor_id, v.date"
                      ",min(country) as country"
                      ",min(region) as region "
                      ",(select count(*) from parquet.searches s where v.visitor_id=s.visitor_id and v.date=s.date) as search_count"
                      " from parquet.visitors v"
                      " group by v.date,v.visitor_id)"
                      " group by date,country,region")
dfReport2.show()


AnalysisException: Correlated scalar subquery 'scalarsubquery(v.visitor_id, v.date)' is neither present in the group by, nor in an aggregate function. Add it to group by using ordinal position or wrap it in first() (or first_value) if you don't care which value you get.;
Aggregate [date#715, country#700, region#701], [date#715, country#700, region#701, sum(search_count#703L) AS search_count#704L]
+- SubqueryAlias __auto_generated_subquery_name
   +- Aggregate [date#715, visitor_id#713], [visitor_id#713, date#715, min(country#706) AS country#700, min(region#710) AS region#701, scalar-subquery#702 [visitor_id#713 && date#715] AS search_count#703L]
      :  +- Aggregate [count(1) AS count(1)#731L]
      :     +- Filter ((outer(visitor_id#713) = visitor_id#727) AND (outer(date#715) = date#728))
      :        +- SubqueryAlias s
      :           +- Relation [date_time#719,destination_out#720,destination_ret#721,flight_date_inbound#722,flight_date_outbound#723,origin_out#724,origin_ret#725,segments#726L,visitor_id#727,date#728,flight_date_outbound2#729,flight_date_inbound2#730] parquet
      +- SubqueryAlias v
         +- Relation [countPerday#705L,country#706,first_hit_pagename#707,hits_avg#708,logged_in#709L,region#710,registered#711,visit_start#712,visitor_id#713,visits#714L,date#715] parquet


In [None]:
dfReport3 = spark.sql("select s.date"
                      ",v.country"
                      ",v.region"
                      ",count(*) count"
                      " from parquet.searches s"
                      " left join (select visitor_id,date"
                      ",min(country) as country"
                      ",min(region) as region"
                      " from parquet.visitors"
                      " group by date,visitor_id"
                      ")v on (v.visitor_id=s.visitor_id and v.date=s.date)"
                      " group by s.date,v.country,v.region")
dfReport3.show()

In [38]:
dfReport4 = spark.sql("select s.date"
                      ",v.country"
                      ",v.region"
                      ",s.visitor_id"
                      " from parquet.searches s"
                      " left join (select visitor_id,date"
                      ",min(country) as country"
                      ",min(region) as region"
                      " from parquet.visitors"
                      " group by date,visitor_id"
                      ")v on (v.visitor_id=s.visitor_id and v.date=s.date)"
                      " where country='deu'"
                      " and region='hh'"
                      " and s.date=to_date('2021-01-27')"
                      )
print(dfReport4.count())
dfReport4.show(dfReport4.count())


20
+----------+-------+------+--------------------+
|      date|country|region|          visitor_id|
+----------+-------+------+--------------------+
|2021-01-27|    deu|    hh| 8.882751273974802E8|
|2021-01-27|    deu|    hh|1.0344174566455255E9|
|2021-01-27|    deu|    hh| 3.024205726639306E9|
|2021-01-27|    deu|    hh|3.5218130777586017E9|
|2021-01-27|    deu|    hh| 4.134361368976001E9|
|2021-01-27|    deu|    hh|4.1894549207397013E9|
|2021-01-27|    deu|    hh|  9.97218694109933E9|
|2021-01-27|    deu|    hh|1.034593218455849...|
|2021-01-27|    deu|    hh|1.573601994971426E10|
|2021-01-27|    deu|    hh|1.594437683350282...|
|2021-01-27|    deu|    hh|1.677667485349775E10|
|2021-01-27|    deu|    hh|1.974082641320227E10|
|2021-01-27|    deu|    hh|2.218354240864906...|
|2021-01-27|    deu|    hh|2.264144205478094...|
|2021-01-27|    deu|    hh|2.952606478717622E10|
|2021-01-27|    deu|    hh|3.069936315761232...|
|2021-01-27|    deu|    hh|3.636958449053944E10|
|2021-01-27|    d

In [35]:
dfVisitors_id = spark.sql("select visitor_id,date,count(*) count"
                          " from parquet.visitors"
                          " where country='deu'"
                          " and region='hh'"
                          " and date=to_date('2021-01-27')"
                          " group by visitor_id,date")
print(dfVisitors_id.count())
dfVisitors_id.show(dfVisitors_id.count())

425
+--------------------+----------+-----+
|          visitor_id|      date|count|
+--------------------+----------+-----+
|3.444789257338767E10|2021-01-27|    1|
|2.642867462508109E10|2021-01-27|    1|
| 8.620487442081109E7|2021-01-27|    1|
|3.748879316926177E10|2021-01-27|    1|
|  4.0812345958397E10|2021-01-27|    1|
|3.301248869131764...|2021-01-27|    1|
| 3.641714978234134E9|2021-01-27|    1|
|1.045980678225830...|2021-01-27|    1|
|4.195442110538188E10|2021-01-27|    1|
|  2.36535559884103E9|2021-01-27|    1|
|  7.48834893027153E8|2021-01-27|    1|
|2.101561544893404...|2021-01-27|    1|
|3.694311031136334E10|2021-01-27|    1|
| 2.856200875480243E9|2021-01-27|    1|
| 9.755158473935843E9|2021-01-27|    1|
| 8.882751273974802E8|2021-01-27|    1|
| 1.902631866647331E9|2021-01-27|    1|
|1.457959112292328...|2021-01-27|    1|
|1.178334931002990...|2021-01-27|    1|
|2.956012974925573...|2021-01-27|    1|
| 6.537627528574754E9|2021-01-27|    1|
| 4.29528793664338E10|2021-01-27|   

In [32]:
dfSearches_id = spark.sql("select visitor_id,date,count(*) count"
                          " from parquet.searches"
                          " where date=to_date('2021-01-27')"
                          " group by visitor_id,date")
print(dfSearches_id.count())
dfSearches_id.show(dfSearches_id.count())


484
+--------------------+----------+-----+
|          visitor_id|      date|count|
+--------------------+----------+-----+
| 4.472018050763942E9|2021-01-27|    1|
| 6.219173985321057E9|2021-01-27|    1|
| 6.608596429711302E9|2021-01-27|    1|
|3.376705176582297E10|2021-01-27|    1|
|2.091864146345038E10|2021-01-27|    1|
|2.039636221488659E10|2021-01-27|    1|
|4.505402058808208...|2021-01-27|    1|
| 1.924676053397164E9|2021-01-27|    1|
| 6.240660318618109E9|2021-01-27|    1|
|4.639420827537486E10|2021-01-27|    1|
|1.524418651991379...|2021-01-27|    1|
|1.992886859402716...|2021-01-27|    1|
|3.792619170100393E10|2021-01-27|    1|
|1.407743602342097...|2021-01-27|    1|
|2.167627549359676...|2021-01-27|    1|
|2.485558312125197...|2021-01-27|    1|
|2.083008360864667E10|2021-01-27|    1|
| 8.560955097686894E9|2021-01-27|    1|
|4.835369890642872E10|2021-01-27|    1|
|3.064217489374994E10|2021-01-27|    1|
|3.605812551823887E10|2021-01-27|    1|
| 6.592485649611476E9|2021-01-27|   

In [4]:
#df = spark.sql("select visitor_id,to_date(visit_start) from parquet.visitors limit 10")
# spark.sql("alter table parquet.visitors add columns (visit_start_date datetime)")
#spark.sql("alter table parquet.visitors add columns visit_start_date datetime")
#

root
 |-- countPerday: long (nullable = true)
 |-- first_hit_pagename: string (nullable = true)
 |-- hits_avg: double (nullable = true)
 |-- logged_in: long (nullable = true)
 |-- region: string (nullable = true)
 |-- registered: boolean (nullable = true)
 |-- visit_start: string (nullable = true)
 |-- visitor_id: double (nullable = true)
 |-- visits: long (nullable = true)
 |-- country: string (nullable = true)



In [19]:
# dfVisitors.withColumn("newcol",functions.to_date(dfVisitors["visit_start"]))
# dfVisitors=dfVisitors.withColumn("newcol",(functions.to_date("visit_start",'yyyy-MM-dd HH:mm:ss') if functions.length("visit_start")==19 else  functions.to_date("visit_start",'yy-MM-dd HH:mm:ss')))
# dfVisitors=dfVisitors.withColumn("newcol", functions.when(functions.length("visit_start")==19, functions.to_date("visit_start",'yyyy-MM-dd HH:mm:ss') ).otherwise(functions.to_date("visit_start",'yy-MM-dd HH:mm:ss')))
# dfVisitors=dfVisitors.withColumn("newcol2",(functions.col("visit_start").cast("date")))
# dfVisitors=dfVisitors.where("len(visit_start)==19").select(functions.to_date(dfVisitors.visit_start, 'yyyy-MM-dd HH:mm:ss').alias('newcol3')).collect()
# print("adding newcol2")

adding newcol2


In [76]:
# dfSearches.printSchema()
# dfSearches.select(["date_time","new_date"]).limit(1).show()
dfSearches.select(["date_time","new_date"]).show()
print(dfSearches.select(["date_time"]).limit(4).toPandas())

+--------------------+----------+
|           date_time|  new_date|
+--------------------+----------+
|2021-03-11T06:15:...|2021-03-11|
|2021-03-17T05:54:...|2021-03-17|
|2021-02-13T08:34:...|2021-02-13|
|2021-03-17T00:31:...|2021-03-17|
|2021-03-20T07:47:...|2021-03-20|
|2021-03-31T08:02:...|2021-03-31|
|2021-02-10T04:30:...|2021-02-10|
|2021-03-30T05:47:...|2021-03-30|
|2021-02-02T08:32:...|2021-02-02|
|2021-02-27T07:56:...|2021-02-27|
+--------------------+----------+

                  date_time
0  2021-03-11T06:15:23.000Z
1  2021-03-17T05:54:15.000Z
2  2021-02-13T08:34:03.000Z
3  2021-03-17T00:31:57.000Z


In [75]:
# 2021-03-11T06:15:23.000Z
# dfSearches=dfSearches.withColumn("new_date",functions.to_date("date_time","yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
# print("new_date col added1")


new_date col added1


In [None]:
    # (
#   df.write
#   .mode('overwrite') # or append
#   .partitionBy(col_name) # this is optional
#   .format('parquet') # this is optional, parquet is default
#   .option('path', output_path)
#   .save()
# )
# ------------------
# (
#   df.write
#   .mode('overwrite') # or append
#   .partitionBy(col_name) # this is optional
#   .bucketBy(n, col_name) # n is number of buckets
#   .sortBy(col_name)
#   .format('parquet') # this is optional, parquet is default
#   .option('path', output_path)
#   .saveAsTable(table_name)
# )
# ------------------
# (
#   df.write
#   .insertInto(table_name)
# )
# ----------------
# (
#   df.select(spark.table(table_name).columns)
#   .write
#   .insertInto(table_name)
# )
# -------------------
# spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
# (
#   df # having data only for specific partitions
#   .write
#   .insertInto(table_name, overwrite=True)
# )
# ---------------------
# (
#   df.repartition('year')
#   .sortWithinPartitions('user_id')
#   .write
#   .mode('overwrite')
#   .partitionBy('year')
#   .option('path', output_path)
#   .saveAsTable(table_name)
# )
# ------------------------
# (
#   df.repartition('year')
#   .sortWithinPartitions('year', 'user_id')
#   .write
#   .mode('overwrite')
#   .partitionBy('year')
#   .option('path', output_path)
#   .saveAsTable(table_name)
# )
# --------------------
# (
#   spark.table(table_name)
#   .filter(col('year') == 2020)
#   .filter(col('user_id') == 1)
#   .collect()
# )
# -----------------
#
sparkHive = (SparkSession
                .builder
                .appName('example-pyspark-read-and-write-from-hive')
                .config("hive.metastore.uris", "thrift://localhost:9083", conf=SparkConf())
                .enableHiveSupport()
                .getOrCreate()
                )
data = [('First', 1), ('Second', 2), ('Third', 3), ('Fourth', 4), ('Fifth', 5)]
df = sparkHive.createDataFrame(data)

#---------------------------
# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')

spark = SparkSession\
    .builder\
    .appName("Python Spark SQL Hive integration example")\
    .config("spark.sql.warehouse.dir", warehouse_location)\
    .enableHiveSupport()\
    .getOrCreate()
