In [1]:
import argparse
from urllib.parse import urlparse, unquote
from datetime import datetime
import time 
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, approx_count_distinct, count, col, when, regexp_extract, window, lit, to_timestamp
from pyspark.sql.types import StringType, TimestampType

spark = SparkSession.builder \
.config("spark.sql.shuffle.partitions", "4")\
.master("yarn") \
.appName("spark-course") \
.config("spark.driver.memory", "1g") \
.config("spark.driver.cores", "1") \
.config("spark.executor.instances", "6") \
.config("spark.executor.cores", "3") \
.config("spark.executor.memory", "1g") \
.getOrCreate()

spark.sparkContext.setLogLevel("WARN")


@pandas_udf(StringType())
def url2domain(url):

    domain = url.apply(lambda x: urlparse(unquote(x.strip())).netloc.strip()
                     if urlparse(unquote(x.strip())).scheme in ['http','https']
                     else '')

    return domain


@pandas_udf(TimestampType())
def ts2datetime(unix_ts):

    return unix_ts.apply(lambda x: datetime.fromtimestamp(x))

In [2]:
page_views_raw = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", 
            "brain-node1.bigdatateam.org:9092,brain-node2.bigdatateam.org:9092,brain-node3.bigdatateam.org:9092") \
.option("startingOffsets", "earliest") \
.option("subscribe", "page_views") \
.load()

page_views_raw.createOrReplaceTempView("page_views")

page_views = spark.sql("""
select
    cast(splitted[0] as long) as ts,
    cast(splitted[0] as double) as ts_2,
    splitted[1] as uid,
    splitted[2] as url,
    parse_url(splitted[2], 'HOST') as domain_m,
    splitted[3] as title,
    splitted[4] as ua,
    case when parse_url(splitted[2], 'HOST') like '%.ru' then 'ru' else 'not ru' end as zone_m
from (
    select split(cast(value as string), '\t') as splitted
    from page_views
) t
""")
page_views = page_views.withColumn("domain_0", url2domain(page_views.url)) 
page_views = page_views.withColumn("timestamp", to_timestamp(page_views.ts))


page_views = page_views.withColumn("zone_0", 
                                   when(regexp_extract(url2domain(page_views.url), r'.+\.(ru)$', 1) == "ru", "ru")
                                   .otherwise("not ru"))



#dates = ("2018-04-01 16:20:41",  "2018-04-01 16:20:43")
#
#timestamps = (
#    time.mktime(datetime.strptime(s, "%Y-%m-%d %H:%M:%S").timetuple())
#    for s in dates)
#
#q1 = "CAST(timestamp AS INT) BETWEEN {0} AND {1}".format(*timestamps)
#print(page_views.filter(col("timestamp") != col("timestamp_1")).count())

page_views\
.filter(col("zone_0") != col("zone_m"))\
.select("url", "domain_0", "domain_m", "zone_0", "zone_m")\
.show(500, vertical=True, truncate=False)

#streaming = page_views\
#.groupBy(window("timestamp", "2 seconds", "1 second"), "zone")\
#.agg(approx_count_distinct("uid").alias("unique"),
#    count("uid").alias("view"))\
#.orderBy(col("window").asc(), col("view").desc())\
#.limit(20)\
#.writeStream \
#.outputMode("complete") \
#.format("console") \
#.option("truncate", "false") \
#.trigger(processingTime='5 seconds')\
#.start()

-RECORD 0-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 url      | https://www.sportmaster.ru/product/10094944/?icid=catalog/sport/podarochnye_karty/|recent|10094944                                                                                                                                                                                                                       

In [10]:
page_views\
.filter(col("zone_0") == col("zone_m"))\
.select("url", "url_m", "domain", "zone_0", "zone", "zone_m")\
.show(50, vertical=True, truncate=False)

-RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 url    | http://www.ntv.ru/novosti/1998904/?utm_referrer=https%3A%2F%2Fzen.yandex.com                                                                                                                                 
 url_m  | www.ntv.ru                                                                                                                                                                                                   
 domain | www.ntv.ru                                                                                                                                                                                                   
 zone_0 | ru                                                                                                                            

In [8]:
srt_ = 'https://www.pleer.ru/product_251365_Element_878D.html?frommarket=https%3A//market.yandex.ru/catalog/57707/list%3Ftext%3Delement+878d%26hid%3D4981541%26clid%3D545%26suggest%3D2%26glfilter%3D7893318%3A14440125%26rt%3D11%26was_redir%3D1%26rs%3DeJwzqlWS5uJJzUnNTc0rUbAwt0gROPboIbMSCweDALsGg1EcVwwXL8eqI6wCLBIMqrUaU_cCuWdPs&ymclid=225888382199394701800002'

urlparse(unquote(srt_.strip())).netloc.strip()

'www.pleer.ru'

In [13]:
streaming.stop()