### 1. Load the data

In [4]:
! wget https://github.com/erkansirin78/datasets/raw/master/market1mil.csv.gz

--2023-04-18 12:48:17--  https://github.com/erkansirin78/datasets/raw/master/market1mil.csv.gz
Resolving github.com (github.com)... 140.82.121.3
Connecting to github.com (github.com)|140.82.121.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/erkansirin78/datasets/master/market1mil.csv.gz [following]
--2023-04-18 12:48:18--  https://raw.githubusercontent.com/erkansirin78/datasets/master/market1mil.csv.gz
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.109.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 44525776 (42M) [application/octet-stream]
Saving to: ‘market1mil.csv.gz’


2023-04-18 12:48:35 (2.88 MB/s) - ‘market1mil.csv.gz’ saved [44525776/44525776]



In [5]:
!gzip -d market1mil.csv.gz

### 2.Spark and Elasticsearch Library and Configuration

In [6]:
import findspark
import warnings
import pandas as pd
warnings.filterwarnings('ignore')
from elasticsearch import Elasticsearch, helpers
import time
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *

In [7]:
findspark.init("/opt/manual/spark")

In [8]:
spark = (
    SparkSession.builder
    .appName("Spark Elasticsearch")
    .master("local[2]")
    .config("spark.driver.memory","2048m")
    .config("spark.sql.shuffle.partitions", 4)
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.jars.packages", "org.elasticsearch:elasticsearch-spark-30_2.12:7.12.1") 
    .getOrCreate()
)



:: loading settings :: url = jar:file:/opt/manual/spark-3.1.1-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/train/.ivy2/cache
The jars for the packages stored in: /home/train/.ivy2/jars
org.elasticsearch#elasticsearch-spark-30_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-98930bd1-18f5-4744-ae28-e4962631da71;1.0
	confs: [default]
	found org.elasticsearch#elasticsearch-spark-30_2.12;7.12.1 in central
	found org.scala-lang#scala-reflect;2.12.8 in central
	found org.slf4j#slf4j-api;1.7.6 in local-m2-cache
	found commons-logging#commons-logging;1.1.1 in local-m2-cache
	found javax.xml.bind#jaxb-api;2.3.1 in central
	found com.google.protobuf#protobuf-java;2.5.0 in local-m2-cache
	found org.apache.spark#spark-yarn_2.12;3.0.1 in central
:: resolution report :: resolve 973ms :: artifacts dl 7ms
	:: modules in use:
	com.google.protobuf#protobuf-java;2.5.0 from local-m2-cache in [default]
	commons-logging#commons-logging;1.1.1 from local-m2-cache in [default]
	javax.xml.bind#jaxb-api;2.3.1 from central in [default]
	org.ap

### 3. Read Data

In [9]:
df = spark.read.format("csv") \
          .option("header", True) \
          .option("inferSchema", True) \
          .option("sep", ";") \
          .load("file:////home/train/dataops_homework/7_Week/market1mil.csv") \
          .select("LOGICALREF", "ITEMCODE", "ITEMNAME", "AMOUNT", "PRICE", "LINENETTOTAL", "BRANCH", "CITY", "BRAND", "STARTDATE", "LONGITUDE","LATITUDE") \
          .cache()

                                                                                

In [11]:
df.limit(5).toPandas()

Unnamed: 0,LOGICALREF,ITEMCODE,ITEMNAME,AMOUNT,PRICE,LINENETTOTAL,BRANCH,CITY,BRAND,STARTDATE,LONGITUDE,LATITUDE
0,1,8,TOZ SEKER,45,265,53,Batman Subesi,Batman,,3.01.2017 09:25,411351,378812
1,2,20868,KIRMIZI MERCIMEK,1006,28,282,Batman Subesi,Batman,BAKLİYAT,3.01.2017 09:25,411351,378812
2,3,8583,"TEST MATIK 1,5 KG NORMAL",1,495,495,Batman Subesi,Batman,TEST,3.01.2017 09:25,411351,378812
3,4,1454,BIZIM MAKARNA BONCUK,1,11,11,Batman Subesi,Batman,ÜLKER,3.01.2017 09:25,411351,378812
4,5,13519,FILIZ MAKARNA KISA KESME 500 GR,1,11,11,Batman Subesi,Batman,FİLİZ,3.01.2017 09:25,411351,378812


In [12]:
df.count()

                                                                                

999853

In [13]:
df1 = df.na.drop()

In [14]:
df1.count()

                                                                                

942431

In [15]:
df1.printSchema()

root
 |-- LOGICALREF: string (nullable = true)
 |-- ITEMCODE: integer (nullable = true)
 |-- ITEMNAME: string (nullable = true)
 |-- AMOUNT: string (nullable = true)
 |-- PRICE: string (nullable = true)
 |-- LINENETTOTAL: string (nullable = true)
 |-- BRANCH: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- BRAND: string (nullable = true)
 |-- STARTDATE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)
 |-- LATITUDE: string (nullable = true)



In [14]:
df2.limit(5).toPandas()

Unnamed: 0,LOGICALREF,ITEMCODE,ITEMNAME,AMOUNT,PRICE,LINENETTOTAL,BRANCH,CITY,BRAND,STARTDATE,LONGITUDE,LATITUDE,LOCATION
0,2,20868,KIRMIZI MERCIMEK,1.006,2.8,2.82,Batman Subesi,Batman,BAKLİYAT,1483424700000,411351,378812,"[41,1351, 37,8812]"
1,3,8583,"TEST MATIK 1,5 KG NORMAL",1.0,4.95,4.95,Batman Subesi,Batman,TEST,1483424700000,411351,378812,"[41,1351, 37,8812]"
2,4,1454,BIZIM MAKARNA BONCUK,1.0,1.1,1.1,Batman Subesi,Batman,ÜLKER,1483424700000,411351,378812,"[41,1351, 37,8812]"
3,5,13519,FILIZ MAKARNA KISA KESME 500 GR,1.0,1.1,1.1,Batman Subesi,Batman,FİLİZ,1483424700000,411351,378812,"[41,1351, 37,8812]"
4,6,8639,BINGO ULTRA CAM.SUYU 750 ML KLASIK,1.0,2.45,2.45,Batman Subesi,Batman,BİNGO,1483424700000,411351,378812,"[41,1351, 37,8812]"


In [16]:
df2 = df1 \
.withColumn("LOGICALREF", F.col("LOGICALREF").cast(IntegerType())) \
.withColumn("ITEMCODE", F.col("ITEMCODE").cast(IntegerType())) \
.withColumn("AMOUNT", F.regexp_replace("AMOUNT", ",", ".").cast(FloatType())) \
.withColumn("PRICE", F.regexp_replace("PRICE", ",", ".").cast(FloatType())) \
.withColumn("LINENETTOTAL", F.regexp_replace("LINENETTOTAL", ",", ".").cast(FloatType())) \
.withColumn("STARTDATE",  F.unix_timestamp( F.to_timestamp(F.col("STARTDATE"), "d.MM.yyyy HH:mm")) * 1000 ) \
.withColumn("LONGITUDE", F.regexp_replace("LONGITUDE", ",", ".").substr(1, 6).cast(FloatType())) \
.withColumn("LATITUDE", F.regexp_replace("LATITUDE", ",", ".").substr(1, 6).cast(FloatType())) \
.withColumn("LOCATION", F.array(  F.col("LONGITUDE"), F.col("LATITUDE"))) \
                         .filter("LONGITUDE < 100") # filter wrong longitude

In [17]:
df2.printSchema()

root
 |-- LOGICALREF: integer (nullable = true)
 |-- ITEMCODE: integer (nullable = true)
 |-- ITEMNAME: string (nullable = true)
 |-- AMOUNT: float (nullable = true)
 |-- PRICE: float (nullable = true)
 |-- LINENETTOTAL: float (nullable = true)
 |-- BRANCH: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- BRAND: string (nullable = true)
 |-- STARTDATE: long (nullable = true)
 |-- LONGITUDE: float (nullable = true)
 |-- LATITUDE: float (nullable = true)
 |-- LOCATION: array (nullable = false)
 |    |-- element: float (containsNull = true)



### 4. Elasticsearch

In [29]:
market_index =  {
  "settings": {
    "index": {
      "analysis": {
        "analyzer": {
          "custom_analyzer":
          {
            "type":"custom",
            "tokenizer":"standard",
            "filter":[
              "lowercase", "custom_edge_ngram"
            ]
          }
        },
        "filter": {
          "custom_edge_ngram": {
            "type": "edge_ngram",
            "min_gram":2,
            "max_gram": 10
            }
          }
        }
      }
    },
    "mappings": {
    "properties": {
      "LOGICALREF":  {"type": "integer"},  
      "ITEMCODE":    {"type": "integer" }, 
      "ITEMNAME":    {"type": "text"},
      "AMOUNT":      {"type": "float"},
      "PRICE":       {"type": "float"},
      "LINENETTOTAL":{"type": "float"},
      "BRANCH":      {"type": "keyword"},
      "CITY":        {"type": "keyword"},
      "BRAND":       {"type": "keyword"},
      "STARTDATE":   {"type": "date"},
      "LOCATION":    {"type": "geo_point"}
        
    }
  }
  }

In [19]:
es = Elasticsearch()

In [30]:
try:
    es.indices.delete("market_index")
    print("market_index index deleted.")
except:
    print("No index")

No index


In [31]:
es.indices.create("market_index", body=market_one_mil)

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'market_index'}

### 5. Write to Elasticsearch

In [32]:
start_time = time.time()

df2.drop("LONGITUDE","LATITUDE").write \
    .format("org.elasticsearch.spark.sql") \
    .mode("overwrite") \
    .option("es.nodes", "localhost") \
    .option("es.port","9200") \
    .save("market_index")


print("----- %s secs -----" %(time.time() - start_time))



----- 35.742648124694824 secs -----


                                                                                

### 6. Read to Elasticsearch

In [39]:
df_elastic = spark.read \
                  .format("org.elasticsearch.spark.sql") \
                  .option("es.nodes", "localhost") \
                  .option("es.port","9200") \
                  .load("market-one-mil")

In [40]:
df_elastic.limit(5).toPandas()

Unnamed: 0,AMOUNT,BRANCH,BRAND,CITY,ITEMCODE,ITEMNAME,LINENETTOTAL,LOCATION,LOGICALREF,PRICE,STARTDATE
0,1.0,Şanlıurfa Subesi,FRİTO,Şanlıurfa,21196,CHEETOS 44 GR PEYNIR&SOGAN,1.0,"[38.796, 37.159]",17081,1.0,2017-01-08 17:28:00
1,1.0,Şanlıurfa Subesi,FRİTO,Şanlıurfa,21195,LAY`S KLASIK 125 GR.,2.5,"[38.796, 37.159]",17082,2.5,2017-01-08 17:28:00
2,1.0,Şanlıurfa Subesi,ÜLKER,Şanlıurfa,3871,ULKER CUBUK KRAKER 5'LI,1.0,"[38.796, 37.159]",17083,1.0,2017-01-08 17:28:00
3,1.0,Şanlıurfa Subesi,ÜLKER,Şanlıurfa,3871,ULKER CUBUK KRAKER 5'LI,1.0,"[38.796, 37.159]",17084,1.0,2017-01-08 17:28:00
4,0.414,Şanlıurfa Subesi,EKİCİ,Şanlıurfa,21036,EKICI TAM YAGLI BEYAZ PEYNIR,5.36,"[38.796, 37.159]",17085,12.95,2017-01-08 17:28:00


In [41]:
df_elastic.selectExpr("MIN(STARTDATE)","MAX(STARTDATE)").show()

[Stage 20:>                                                         (0 + 1) / 1]

+-------------------+-------------------+
|     min(STARTDATE)|     max(STARTDATE)|
+-------------------+-------------------+
|2017-01-03 09:25:00|2017-05-25 21:00:00|
+-------------------+-------------------+



                                                                                

In [42]:
df_elastic.printSchema()

root
 |-- AMOUNT: float (nullable = true)
 |-- BRANCH: string (nullable = true)
 |-- BRAND: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- ITEMCODE: integer (nullable = true)
 |-- ITEMNAME: string (nullable = true)
 |-- LINENETTOTAL: float (nullable = true)
 |-- LOCATION: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- LOGICALREF: integer (nullable = true)
 |-- PRICE: float (nullable = true)
 |-- STARTDATE: timestamp (nullable = true)



In [43]:
df_elastic.count()

                                                                                

936156