In [1]:
# Go to terminal and run source ~/venvspark/bin/activate; pip install elasticsearch==7.9.0

In [1]:
import findspark
import warnings
import pandas as pd
warnings.filterwarnings('ignore')
from elasticsearch import Elasticsearch, helpers
import time
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

In [2]:
findspark.init("/opt/manual/spark")

In [3]:
from pyspark.sql import SparkSession, functions as F

In [4]:
spark = (
    SparkSession.builder
    .appName("Spark Elasticsearch")
    .master("local[2]")
    .config("spark.driver.memory","2048m")
    .config("spark.sql.shuffle.partitions", 4)
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.jars.packages", "org.elasticsearch:elasticsearch-spark-30_2.12:7.12.1") 
    .getOrCreate()
)



:: loading settings :: url = jar:file:/opt/manual/spark-3.1.1-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/train/.ivy2/cache
The jars for the packages stored in: /home/train/.ivy2/jars
org.elasticsearch#elasticsearch-spark-30_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-bfecd6da-8aa5-42c4-9751-d1caf6137246;1.0
	confs: [default]
	found org.elasticsearch#elasticsearch-spark-30_2.12;7.12.1 in central
	found org.scala-lang#scala-reflect;2.12.8 in central
	found org.slf4j#slf4j-api;1.7.6 in local-m2-cache
	found commons-logging#commons-logging;1.1.1 in local-m2-cache
	found javax.xml.bind#jaxb-api;2.3.1 in central
	found com.google.protobuf#protobuf-java;2.5.0 in local-m2-cache
	found org.apache.spark#spark-yarn_2.12;3.0.1 in central
:: resolution report :: resolve 1160ms :: artifacts dl 16ms
	:: modules in use:
	com.google.protobuf#protobuf-java;2.5.0 from local-m2-cache in [default]
	commons-logging#commons-logging;1.1.1 from local-m2-cache in [default]
	javax.xml.bind#jaxb-api;2.3.1 from central in [default]
	org.

In [5]:
# ! wget -O ~/datasets/housing.csv https://raw.githubusercontent.com/erkansirin78/datasets/master/housing.csv

In [6]:
df = spark.read.format("csv") \
.option("header", True) \
.option("inferSchema", True) \
.option("sep", ";") \
.load("file:///home/train/datasets/market1mil.csv.gz") \
.select("LOGICALREF", "ITEMCODE", "ITEMNAME", "AMOUNT", "PRICE", "LINENETTOTAL", "BRANCH", "CITY", "BRAND", "STARTDATE", "LONGITUDE","LATITUDE") \
.cache()

                                                                                

In [7]:
df.limit(5).toPandas()

                                                                                

Unnamed: 0,LOGICALREF,ITEMCODE,ITEMNAME,AMOUNT,PRICE,LINENETTOTAL,BRANCH,CITY,BRAND,STARTDATE,LONGITUDE,LATITUDE
0,1,8,TOZ SEKER,45,265,53,Batman Subesi,Batman,,3.01.2017 09:25,411351,378812
1,2,20868,KIRMIZI MERCIMEK,1006,28,282,Batman Subesi,Batman,BAKLİYAT,3.01.2017 09:25,411351,378812
2,3,8583,"TEST MATIK 1,5 KG NORMAL",1,495,495,Batman Subesi,Batman,TEST,3.01.2017 09:25,411351,378812
3,4,1454,BIZIM MAKARNA BONCUK,1,11,11,Batman Subesi,Batman,ÜLKER,3.01.2017 09:25,411351,378812
4,5,13519,FILIZ MAKARNA KISA KESME 500 GR,1,11,11,Batman Subesi,Batman,FİLİZ,3.01.2017 09:25,411351,378812


In [8]:
df.count()

                                                                                

999853

In [9]:
# You can impute nulls as you wish. I prefer drop.

In [10]:
df1 = df.na.drop()

In [11]:
df1.count()

                                                                                

942431

In [12]:
df1.filter("LOGICALREF == '588'").show()

[Stage 7:>                                                          (0 + 1) / 1]

+----------+--------+--------------------+------+-----+------------+-----------+----+-----+---------------+---------+--------+
|LOGICALREF|ITEMCODE|            ITEMNAME|AMOUNT|PRICE|LINENETTOTAL|     BRANCH|CITY|BRAND|      STARTDATE|LONGITUDE|LATITUDE|
+----------+--------+--------------------+------+-----+------------+-----------+----+-----+---------------+---------+--------+
|       588|    1314|BIZIM MARGARIN 25...|     1| 1,65|        1,65|Ağrı Subesi|Ağrı|ÜLKER|3.01.2017 17:37| 43021596| 39,6269|
+----------+--------+--------------------+------+-----+------------+-----------+----+-----+---------------+---------+--------+



                                                                                

# Schema modification

In [13]:
df1.printSchema()

root
 |-- LOGICALREF: string (nullable = true)
 |-- ITEMCODE: integer (nullable = true)
 |-- ITEMNAME: string (nullable = true)
 |-- AMOUNT: string (nullable = true)
 |-- PRICE: string (nullable = true)
 |-- LINENETTOTAL: string (nullable = true)
 |-- BRANCH: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- BRAND: string (nullable = true)
 |-- STARTDATE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)
 |-- LATITUDE: string (nullable = true)



In [14]:
from pyspark.sql.types import *

In [76]:
df2 = df1 \
.withColumn("STARTDATE",  F.unix_timestamp( F.to_timestamp(F.col("STARTDATE"), "d.MM.yyyy HH:mm")) * 1000 ) \
.withColumn("LOGICALREF", F.col("LOGICALREF").cast(IntegerType())) \
.withColumn("ITEMCODE", F.col("ITEMCODE").cast(IntegerType())) \
.withColumn("AMOUNT", F.regexp_replace("AMOUNT", ",", ".").cast(FloatType())) \
.withColumn("PRICE", F.regexp_replace("PRICE", ",", ".").cast(FloatType())) \
.withColumn("LINENETTOTAL", F.regexp_replace("LINENETTOTAL", ",", ".").cast(FloatType())) \
.withColumn("LONGITUDE", F.regexp_replace("LONGITUDE", ",", ".").substr(1, 6).cast(FloatType())) \
.withColumn("LATITUDE", F.regexp_replace("LATITUDE", ",", ".").substr(1, 6).cast(FloatType())) \
.withColumn("LOCATION", 
                   F.array(  F.col("LONGITUDE"), F.col("LATITUDE"))) \
.filter("LONGITUDE < 100") # filter wrong longitude

In [77]:
df2.limit(5).toPandas()

Unnamed: 0,LOGICALREF,ITEMCODE,ITEMNAME,AMOUNT,PRICE,LINENETTOTAL,BRANCH,CITY,BRAND,STARTDATE,LONGITUDE,LATITUDE,LOCATION
0,2,20868,KIRMIZI MERCIMEK,1.006,2.8,2.82,Batman Subesi,Batman,BAKLİYAT,1483424700000,41.134998,37.881001,"[41.1349983215332, 37.88100051879883]"
1,3,8583,"TEST MATIK 1,5 KG NORMAL",1.0,4.95,4.95,Batman Subesi,Batman,TEST,1483424700000,41.134998,37.881001,"[41.1349983215332, 37.88100051879883]"
2,4,1454,BIZIM MAKARNA BONCUK,1.0,1.1,1.1,Batman Subesi,Batman,ÜLKER,1483424700000,41.134998,37.881001,"[41.1349983215332, 37.88100051879883]"
3,5,13519,FILIZ MAKARNA KISA KESME 500 GR,1.0,1.1,1.1,Batman Subesi,Batman,FİLİZ,1483424700000,41.134998,37.881001,"[41.1349983215332, 37.88100051879883]"
4,6,8639,BINGO ULTRA CAM.SUYU 750 ML KLASIK,1.0,2.45,2.45,Batman Subesi,Batman,BİNGO,1483424700000,41.134998,37.881001,"[41.1349983215332, 37.88100051879883]"


In [78]:
df2.printSchema()

root
 |-- LOGICALREF: integer (nullable = true)
 |-- ITEMCODE: integer (nullable = true)
 |-- ITEMNAME: string (nullable = true)
 |-- AMOUNT: float (nullable = true)
 |-- PRICE: float (nullable = true)
 |-- LINENETTOTAL: float (nullable = true)
 |-- BRANCH: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- BRAND: string (nullable = true)
 |-- STARTDATE: long (nullable = true)
 |-- LONGITUDE: float (nullable = true)
 |-- LATITUDE: float (nullable = true)
 |-- LOCATION: array (nullable = false)
 |    |-- element: float (containsNull = true)



In [86]:
market_one_mil =  {
  "settings": {
    "index": {
      "analysis": {
        "analyzer": {
          "custom_analyzer":
          {
            "type":"custom",
            "tokenizer":"standard",
            "filter":[
              "lowercase", "custom_edge_ngram"
            ]
          }
        },
        "filter": {
          "custom_edge_ngram": {
            "type": "edge_ngram",
            "min_gram":2,
            "max_gram": 10
            }
          }
        }
      }
    },
    "mappings": {
    "properties": {
      "LOGICALREF":    { "type": "integer" },  
      "ITEMCODE":  { "type": "integer"  }, 
      "ITEMNAME":   { "type": "text"  },
      "AMOUNT": {"type": "float"},
        "PRICE": {"type": "float"},
        "LINENETTOTAL": {"type": "float"},
         "BRANCH": {"type": "keyword"},
         "CITY": {"type": "keyword"},
         "BRAND": {"type": "keyword"},
      "STARTDATE": {
        "type":   "date"
      },
      "LOCATION": {"type": "geo_point"}
        
    }
  }
  }

In [87]:
es = Elasticsearch()

In [88]:
try:
    es.indices.delete("market-one-mil")
    print("market-one-mil  index deleted.")
except:
    print("No index")

market-one-mil  index deleted.


In [89]:
es.indices.create("market-one-mil", body=market_one_mil)

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'market-one-mil'}

# Write spark dataframe to Elasticseach

In [90]:
start_time = time.time()

df2.drop("LONGITUDE","LATITUDE").write \
    .format("org.elasticsearch.spark.sql") \
    .mode("overwrite") \
    .option("es.nodes", "localhost") \
    .option("es.port","9200") \
    .save("market-one-mil")


print("----- %s secs -----" %(time.time() - start_time))

[Stage 37:>                                                         (0 + 1) / 1]

----- 148.13287997245789 secs -----


                                                                                

# Read from ES with Spark

In [91]:
df_es = spark.read \
    .format("org.elasticsearch.spark.sql") \
    .option("es.nodes", "localhost") \
    .option("es.port","9200") \
    .load("market-one-mil")

In [92]:
df_es.limit(5).toPandas()

Unnamed: 0,AMOUNT,BRANCH,BRAND,CITY,ITEMCODE,ITEMNAME,LINENETTOTAL,LOCATION,LOGICALREF,PRICE,STARTDATE
0,0.775,İstanbul Subesi,HAL,İstanbul,5711,HAVUC,1.4,"[28.977, 41.005]",20492,1.8,2017-01-09 18:36:00
1,1.0,İstanbul Subesi,KİNDER,İstanbul,7851,KINDER CIKOLATA 4LI 50GR,1.65,"[28.977, 41.005]",20493,1.65,2017-01-09 18:40:00
2,1.0,İstanbul Subesi,ÜLKER,İstanbul,22878,ULKER 80GR BATON SUTLU 260-,0.99,"[28.977, 41.005]",20494,0.99,2017-01-09 18:40:00
3,1.0,Afyonkarahisar Subesi,F SAFF,Afyonkarahisar,19121,F SAFF MENDIL CEP,0.2,"[30.556, 38.75]",20495,0.2,2017-01-09 18:48:00
4,1.0,Afyonkarahisar Subesi,FANTA,Afyonkarahisar,3912,FANTA 1 LT PORTAKAL PET SISE,1.95,"[30.556, 38.75]",20496,1.95,2017-01-09 18:48:00


In [93]:
df_es.selectExpr("MIN(STARTDATE)","MAX(STARTDATE)").show()

[Stage 39:>                                                         (0 + 1) / 1]

+-------------------+-------------------+
|     min(STARTDATE)|     max(STARTDATE)|
+-------------------+-------------------+
|2017-01-03 09:25:00|2017-05-25 21:00:00|
+-------------------+-------------------+



                                                                                

In [27]:
df_es.printSchema()

root
 |-- Country: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- InvoiceNo: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- TotalPrice: float (nullable = true)
 |-- UnitPrice: float (nullable = true)



In [29]:
df_es.selectExpr("MIN(InvoiceDate)", "MAX(InvoiceDate)").limit(30).toPandas()

Unnamed: 0,min(InvoiceDate),max(InvoiceDate)
0,2010-12-01 00:03:00,2011-12-09 11:59:00


In [30]:
df_es.count()

406829

In [26]:
spark.stop()