# Apache Cassandra ile Veri Modelleme

## İş Problemi

Iyzico internetten alışveriş deneyimini hem alıcılar hem de satıcılar için kolaylaştıran bir finansal
teknolojiler şirketidir. E-ticaret firmaları, pazaryerleri ve bireysel kullanıcılar için ödeme altyapısı
sağlamaktadır. Elimizdeki veri seti ile geçmişe yönelik sorguların gerçekleştirileceği bir
Cassandra veri tabanı kurulmak istenmektedir.

In [1]:
import findspark
import pandas as pd
pd.set_option("display.max_columns", None)
pd.set_option('display.max_colwidth', None)
import time

In [81]:
import warnings
from pyspark.sql import SparkSession

# Uyarıları engellemek için filtreleme ayarlarını yapılandır
warnings.filterwarnings("ignore", category=DeprecationWarning)

In [2]:
findspark.init("/opt/manual/spark")

from pyspark.sql import SparkSession, functions as F

In [3]:
spark = (
    SparkSession.builder
    .appName("Spark-Cassandra-With-Catalog") 
    .master("local[2]") 
    .config("spark.driver.memory","2048m") 
    .config("spark.sql.shuffle.partitions", 4) 
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.0.0") 
    .config("spark.sql.extensions","com.datastax.spark.connector.CassandraSparkExtensions")
    .config("spark.sql.catalog.docker3nodescluster", 
            "com.datastax.spark.connector.datasource.CassandraCatalog") 
    .config("spark.sql.catalog.docker3nodescluster.spark.cassandra.connection.host", 
            "127.0.0.1")
    .getOrCreate()
)


In [4]:
df = spark.read.format("csv") \
.option("header", True) \
.option("sep", ",") \
.option("inferSchema", True) \
.option("timestampFormat","yyyy-MM-dd HH:mm:ss") \
.load("file:///home/train/datasets/iyzico.csv") 

In [5]:
df.limit(5).toPandas()

Unnamed: 0,iyzico_data.csv�������������������������������������������������������������������������������������0000664�0001750�0001750�04613461137�14176564341�013155� 0����������������������������������������������������������������������������������������������������ustar �train���������������������������train������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������transaction_date,merchant_id,category,total_paid_price
0,2018-01-01 00:00:20.000000,124381,Gündelik Eşya Mağazaları,410.112
1,2018-01-01 00:01:34.000000,124381,Gündelik Eşya Mağazaları,485.91
2,2018-01-01 00:02:03.000000,124381,Gündelik Eşya Mağazaları,66.42
3,2018-01-01 00:04:34.000000,124381,Gündelik Eşya Mağazaları,225.09
4,2018-01-01 00:04:36.000000,46774,Emlak Ofisleri ve Yöneticileri - Kiralama,479.34


In [6]:
df.printSchema()

root
 |-- iyzico_data.csv                                                                                     0000664 0001750 0001750 04613461137 14176564341 013155  0                                                                                                    ustar   train                           train                                                                                                                                                                                                                  transaction_date: string (nullable = true)
 |-- merchant_id: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- total_paid_price: double (nullable = true)



In [7]:
from pyspark.sql.functions import col, to_timestamp

df = df.withColumnRenamed(df.columns[0], "transaction_date")
df = df.withColumn("transaction_date", to_timestamp(df["transaction_date"], "yyyy-MM-dd HH:mm:ss.SSSSSS"))

In [8]:
df.limit(5).toPandas()

Unnamed: 0,transaction_date,merchant_id,category,total_paid_price
0,2018-01-01 00:00:20,124381,Gündelik Eşya Mağazaları,410.112
1,2018-01-01 00:01:34,124381,Gündelik Eşya Mağazaları,485.91
2,2018-01-01 00:02:03,124381,Gündelik Eşya Mağazaları,66.42
3,2018-01-01 00:04:34,124381,Gündelik Eşya Mağazaları,225.09
4,2018-01-01 00:04:36,46774,Emlak Ofisleri ve Yöneticileri - Kiralama,479.34


In [9]:
df.count()

8391254

In [10]:
df.groupBy("category").count().show()

+--------------------+-------+
|            category|  count|
+--------------------+-------+
|Endüstriyel Malze...| 840951|
|Emlak Ofisleri ve...|1599559|
|Profesyonel Hizme...|1302725|
|Çeşitli Gıda Mağa...| 440029|
|Bilgisayar Yazılı...|1146442|
|Gündelik Eşya Mağ...|1935357|
|Kadın,Erkek Giyim...|1126191|
+--------------------+-------+



# Görev 2

### Adım 1: Spark versiyonunu gözlemleyiniz.

In [11]:
spark.version

'3.0.0'

### Adım 2: Notebook üzerinde Spark ile bir keyspace oluşturunuz.

In [12]:
spark.sql("CREATE DATABASE IF NOT EXISTS docker3nodescluster.iyzico \
  WITH DBPROPERTIES (class='SimpleStrategy', replication_factor='3')").show()

++
||
++
++



### Mevcut keyspaceleri gözlemleyiniz. (Terminalde)

In [None]:
"""
[train@localhost cassandra]$ docker exec -it cas1 bash
root@071d6266d13b:/# cqlsh
Connected to MyCluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.11.14 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh> describe keyspaces

system_schema  system  system_distributed  retail
system_auth    iyzico  system_traces     

"""

### Eğer keyspaceleri notebook üzerinde görmek isterseniz;

In [13]:
from cassandra.cluster import Cluster
import warnings
warnings.filterwarnings("ignore")

# Cassandra bağlantısını oluştur
cluster = Cluster(['127.0.0.1'])
session = cluster.connect()

# Tüm keyspaceleri al
keyspaces = session.execute("SELECT keyspace_name FROM system_schema.keyspaces")

# KeySpace'leri görüntüle
for keyspace in keyspaces:
    print(keyspace.keyspace_name)


  from cryptography.hazmat.primitives import padding


retail
system_auth
system_schema
system_distributed
system
iyzico
system_traces


In [14]:
# 2. alternatif
cluster.metadata.keyspaces

{'retail': <cassandra.metadata.KeyspaceMetadata at 0x7fb7906e9c50>,
 'system_auth': <cassandra.metadata.KeyspaceMetadata at 0x7fb7906e9b70>,
 'system_schema': <cassandra.metadata.KeyspaceMetadata at 0x7fb7906a0da0>,
 'system_distributed': <cassandra.metadata.KeyspaceMetadata at 0x7fb7906650b8>,
 'system': <cassandra.metadata.KeyspaceMetadata at 0x7fb7906658d0>,
 'iyzico': <cassandra.metadata.KeyspaceMetadata at 0x7fb7906752b0>,
 'system_traces': <cassandra.metadata.KeyspaceMetadata at 0x7fb790675358>}

## Görev 3: Çoklanmış kayıtları kontrol etme

In [15]:
from pyspark.sql.functions import desc
df.groupBy('transaction_date', 'merchant_id', 'total_paid_price').count().alias('count')\
    .filter('count > 1').orderBy(desc('count')).show()

+-------------------+-----------+----------------+-----+
|   transaction_date|merchant_id|total_paid_price|count|
+-------------------+-----------+----------------+-----+
|2019-04-11 03:49:36|        535|           180.0|   11|
|2019-04-11 03:49:35|        535|           180.0|   10|
|2018-11-22 20:05:13|      42616|          388.71|    7|
|2019-11-28 20:05:29|      42616|          244.71|    7|
|2019-11-28 20:17:46|      42616|          244.71|    6|
|2019-11-28 20:18:04|      42616|          244.71|    6|
|2019-11-28 20:10:11|      42616|          244.71|    6|
|2020-08-05 12:10:36|      46774|          74.412|    6|
|2019-11-28 20:14:07|      42616|          244.71|    6|
|2019-11-28 20:15:14|      42616|          244.71|    5|
|2019-11-28 20:13:37|      42616|          244.71|    5|
|2019-11-28 20:18:18|      42616|          244.71|    5|
|2019-11-28 20:31:22|      42616|          287.91|    5|
|2018-11-22 20:03:26|      42616|          388.71|    5|
|2019-11-28 20:29:13|      4261

In [16]:
duplicates = df.groupBy('transaction_date', 'merchant_id', 'total_paid_price').count().alias('count')\
    .filter('count > 1')

In [17]:
df.count()

8391254

In [18]:
duplicates.count()

15766

In [19]:
updated_df = df.subtract(duplicates)

#Diğer alternatifler

#updated_df =df.subtract(duplicates)
#updated_df =df.distinct()
#updated_df =df.dropDuplicates()


In [20]:
updated_df.count()

8374318

In [21]:
updated_df.groupBy('transaction_date', 'merchant_id', 'total_paid_price').count().alias('count')\
        .filter('count > 1').show()

+----------------+-----------+----------------+-----+
|transaction_date|merchant_id|total_paid_price|count|
+----------------+-----------+----------------+-----+
+----------------+-----------+----------------+-----+



## Görev 4: Spark SQL ile EDA

### Adım 1: createOrReplaceTempView ile geçici bir tablo oluşturunuz.

In [22]:
updated_df.createOrReplaceTempView("iyz_temp")

In [23]:
spark.sql("SELECT * FROM iyz_temp LIMIT 10").show()

+-------------------+-----------+--------------------+----------------+
|   transaction_date|merchant_id|            category|total_paid_price|
+-------------------+-----------+--------------------+----------------+
|2018-01-01 00:31:12|     124381|Gündelik Eşya Mağ...|          66.438|
|2018-01-01 00:44:32|     124381|Gündelik Eşya Mağ...|          251.73|
|2018-01-01 01:09:28|     124381|Gündelik Eşya Mağ...|           484.2|
|2018-01-01 01:52:09|      46774|Emlak Ofisleri ve...|          142.38|
|2018-01-01 02:04:00|        535|Profesyonel Hizme...|             5.4|
|2018-01-01 09:43:23|     124381|Gündelik Eşya Mağ...|          719.91|
|2018-01-01 09:48:28|        535|Profesyonel Hizme...|            36.0|
|2018-01-01 10:28:12|        535|Profesyonel Hizme...|            90.0|
|2018-01-01 10:32:57|      46774|Emlak Ofisleri ve...|           51.84|
|2018-01-01 11:21:34|      46774|Emlak Ofisleri ve...|         1119.15|
+-------------------+-----------+--------------------+----------

In [24]:
updated_df.limit(5).toPandas()

Unnamed: 0,transaction_date,merchant_id,category,total_paid_price
0,2018-01-01 00:31:12,124381,Gündelik Eşya Mağazaları,66.438
1,2018-01-01 00:44:32,124381,Gündelik Eşya Mağazaları,251.73
2,2018-01-01 01:09:28,124381,Gündelik Eşya Mağazaları,484.2
3,2018-01-01 01:52:09,46774,Emlak Ofisleri ve Yöneticileri - Kiralama,142.38
4,2018-01-01 02:04:00,535,Profesyonel Hizmetler,5.4


### Kaç farklı Merchant Vardır?

In [25]:
spark.sql("SELECT COUNT(DISTINCT merchant_id) FROM iyz_temp").show()

+---------------------------+
|count(DISTINCT merchant_id)|
+---------------------------+
|                          7|
+---------------------------+



### Merchant kırılımında transaction sayıları nedir?

In [26]:
spark.sql("SELECT COUNT(*),merchant_id FROM iyz_temp GROUP BY merchant_id ORDER BY 2 DESC").show()   

+--------+-----------+
|count(1)|merchant_id|
+--------+-----------+
|  439667|     129316|
| 1933755|     124381|
|  838645|      86302|
| 1145818|      57192|
| 1598586|      46774|
| 1119318|      42616|
| 1298529|        535|
+--------+-----------+



### Kaç farklı kategori vardır?

In [27]:
spark.sql("SELECT COUNT(DISTINCT category) AS KATEGORISAYISI FROM iyz_temp").show()

+--------------+
|KATEGORISAYISI|
+--------------+
|             7|
+--------------+



### Kategori kırılımında transaction sayıları nedir?

In [28]:
spark.sql("SELECT COUNT(*) AS ISLEMSAYISI ,category FROM iyz_temp GROUP BY category").show()

+-----------+--------------------+
|ISLEMSAYISI|            category|
+-----------+--------------------+
|     838645|Endüstriyel Malze...|
|    1598586|Emlak Ofisleri ve...|
|    1298529|Profesyonel Hizme...|
|     439667|Çeşitli Gıda Mağa...|
|    1145818|Bilgisayar Yazılı...|
|    1933755|Gündelik Eşya Mağ...|
|    1119318|Kadın,Erkek Giyim...|
+-----------+--------------------+



### Günlere göre transaction sayıları nedir, ilk 10 gözlemi listeleyiniz.

In [29]:
spark.sql("SELECT date_format(transaction_date, 'yyyy-MM-dd') AS date, COUNT(*) AS day_transaction_numbers FROM iyz_temp GROUP BY date ORDER BY day_transaction_numbers DESC LIMIT 10").show()

+----------+-----------------------+
|      date|day_transaction_numbers|
+----------+-----------------------+
|2019-11-28|                  38807|
|2020-11-26|                  38634|
|2020-11-27|                  33417|
|2020-06-16|                  30254|
|2019-11-29|                  24441|
|2020-05-06|                  23282|
|2020-06-17|                  22933|
|2020-05-07|                  22624|
|2020-05-05|                  22180|
|2020-11-11|                  21549|
+----------+-----------------------+



### Haftanın günlerine göre transaction sayıları nedir?

In [30]:
from pyspark.sql.functions import date_format

In [31]:
spark.sql("SELECT date_format(transaction_date, 'EEEE') AS day_name,COUNT(*) as day_transaction_numbers FROM iyz_temp GROUP BY day_name").show()

+---------+-----------------------+
| day_name|day_transaction_numbers|
+---------+-----------------------+
|   Friday|                1246897|
|   Monday|                1311417|
|Wednesday|                1298032|
| Thursday|                1318480|
| Saturday|                1008384|
|  Tuesday|                1261709|
|   Sunday|                 929399|
+---------+-----------------------+



### transaction_date üzerinde kaç farklı gün vardır?

In [32]:
spark.sql("SELECT COUNT(DISTINCT DATE(transaction_date)) FROM iyz_temp").show()

+----------------------------------------------+
|count(DISTINCT CAST(transaction_date AS DATE))|
+----------------------------------------------+
|                                          1097|
+----------------------------------------------+



In [33]:
last_5_rows = updated_df.tail(5)
print(last_5_rows)

[Row(transaction_date=datetime.datetime(2020, 12, 31, 23, 50, 28), merchant_id=124381, category='Gündelik Eşya Mağazaları', total_paid_price=494.784), Row(transaction_date=datetime.datetime(2020, 12, 31, 23, 51, 18), merchant_id=42616, category='Kadın,Erkek Giyim Magazaları', total_paid_price=359.91), Row(transaction_date=datetime.datetime(2020, 12, 31, 23, 55, 9), merchant_id=124381, category='Gündelik Eşya Mağazaları', total_paid_price=737.982), Row(transaction_date=datetime.datetime(2020, 12, 31, 23, 56, 23), merchant_id=124381, category='Gündelik Eşya Mağazaları', total_paid_price=881.964), Row(transaction_date=datetime.datetime(2020, 12, 31, 23, 57, 35), merchant_id=124381, category='Gündelik Eşya Mağazaları', total_paid_price=719.982)]


### Adım 3: transaction_date sütunundaki gün bilgisi ile trans_day adında yeni bir değişken oluşturunuz.

In [34]:
df2 = df.withColumn("trans_day" , F.to_date("transaction_date"))

In [35]:
df2.limit(5).toPandas()

Unnamed: 0,transaction_date,merchant_id,category,total_paid_price,trans_day
0,2018-01-01 00:00:20,124381,Gündelik Eşya Mağazaları,410.112,2018-01-01
1,2018-01-01 00:01:34,124381,Gündelik Eşya Mağazaları,485.91,2018-01-01
2,2018-01-01 00:02:03,124381,Gündelik Eşya Mağazaları,66.42,2018-01-01
3,2018-01-01 00:04:34,124381,Gündelik Eşya Mağazaları,225.09,2018-01-01
4,2018-01-01 00:04:36,46774,Emlak Ofisleri ve Yöneticileri - Kiralama,479.34,2018-01-01


In [None]:
### Adım 4: Eşsiz trans_day ve merchant_id sayısı nedir, inceleyiniz.

In [41]:
df2.select("trans_day").distinct().count()

1097

In [42]:
df2.select("merchant_id").distinct().count()

7

In [43]:
df2.select("merchant_id").distinct().show()

+-----------+
|merchant_id|
+-----------+
|        535|
|      42616|
|     124381|
|      46774|
|     129316|
|      86302|
|      57192|
+-----------+



## Görev 5: Spark ile Cassandra Tablosu Yazma

### Adım1: Spark ile Cassandra üzerinde oluşturmuş olduğunuz keyspace'i kontrol ediniz.

In [51]:
spark.sql("""
SHOW NAMESPACES FROM docker3nodescluster
""").show()

+------------------+
|         namespace|
+------------------+
|     system_traces|
|            iyzico|
|            retail|
|       system_auth|
|system_distributed|
|     system_schema|
|            system|
+------------------+



In [46]:
df2.columns

['transaction_date',
 'merchant_id',
 'category',
 'total_paid_price',
 'trans_day']

In [48]:
df2.printSchema()

root
 |-- transaction_date: timestamp (nullable = true)
 |-- merchant_id: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- total_paid_price: double (nullable = true)
 |-- trans_day: date (nullable = true)



### Adım2: Aşağıdaki iki önemli iş ihtiyacına uygun şekilde bir tablo oluşturunuz.
* Veri seti geriye dönük olarak sorgulanmak isteniyor. Örneğin 2018 yılı 3 Mart
* Aynı gün içinde kayıtlara tarih sırasına göre erişme ihtiyacı vardır.

In [88]:
spark.sql("""
    CREATE TABLE IF NOT EXISTS docker3nodescluster.iyzico.iyz_cas (
        transaction_date timestamp,
        merchant_id int,
        category string,
        total_paid_price double,
        trans_day date
    )
    USING cassandra
    PARTITIONED BY (trans_day)
    TBLPROPERTIES (
        clustering_key='transaction_date.desc, merchant_id.asc',
        compaction='{class=SizeTieredCompactionStrategy, bucket_high=1001}'
    )
""")


DataFrame[]

In [130]:
# terminalde cqlsh ile cassandra tablosunun bilgilerine bakmak için;

#cqlsh:iyzico> DESCRIBE TABLE iyz_cas;  

In [53]:
#Clustering key, verilerin tabloda nasıl sıralanacağını belirler.
#Partition key, verilerin nasıl bölümlere ayrılacağını belirler.

In [None]:
"""
CREATE TABLE IF NOT EXISTS docker3nodescluster.iyzico.iyz_cas:

CREATE TABLE ifadesi, yeni bir tablo oluşturulacağını belirtir.
IF NOT EXISTS ifadesi, tablonun zaten varsa tekrar oluşturulmamasını sağlar.
docker3nodescluster Cassandra kümesinin adını temsil eder.
iyzico anahtar alanını temsil eder.
iyz_cas tablo adını temsil eder.
Tablonun sütunları:

transaction_date sütunu, timestamp veri türüne sahip bir zaman damgasıdır.
merchant_id sütunu, int veri türüne sahip bir tamsayıdır.
category sütunu, string veri türüne sahip bir metin alanıdır.
total_paid_price sütunu, double veri türüne sahip bir ondalık sayıdır.
trans_day sütunu, date veri türüne sahip bir tarihi temsil eder.
USING cassandra:

Bu ifade, tablonun Cassandra veritabanında depolanacağını belirtir.
PARTITIONED BY (trans_day):

Bu ifade, trans_day sütununun tablonun bölümleme sütunu (partitioning column) olarak kullanılacağını belirtir. Veriler, bu sütuna göre dağıtılacak ve bölümlere ayrılacak.
TBLPROPERTIES:

Bu ifade, tablo özelliklerini belirtir.
clustering_key='transaction_date.desc, merchant_id.asc' ifadesi, transaction_date sütununun 
tablonun kümeleme anahtarı (clustering key) olarak kullanılacağını ve sıralamanın azalan (desc) 
olacağını belirtir. Ayrıca, merchant_id sütunu da kümeleme anahtarı olarak belirtilir ve sıralama artan (asc) olur.
compaction='{class=SizeTieredCompactionStrategy, bucket_high=1001}' ifadesi, tablonun bellek boyutuna 
dayalı bir birleştirme stratejisi kullanacağını belirtir. Bu strateji, veri bölümlerini birleştirirken 
belirli bir boyutu hedefler.
"""

In [89]:
#Tabloyu oluşturduk.Oluşturulan boş tabloyu cassandradan okuyarak kontrol edelim.
df_from_cassandra = spark.read.table("docker3nodescluster.iyzico.iyz_cas")
df_from_cassandra.limit(5).toPandas()

Unnamed: 0,trans_day,transaction_date,merchant_id,category,total_paid_price


### Adım 3: Spark ile veriyi bu tabloya yazınız.

In [87]:
from cassandra.cluster import Cluster

# Cassandra bağlantısını oluştur
cluster = Cluster(['127.0.0.1'])
session = cluster.connect()

# Düğüm bilgilerini al
metadata = cluster.metadata
node_count = len(metadata.all_hosts())

print("Düğüm Sayısı:", node_count)

ERROR:cassandra.connection:Closing connection <AsyncoreConnection(140426376307768) 127.0.0.1:9042> due to protocol error: Error from server: code=000a [Protocol error] message="Beta version of the protocol used (5/v5-beta), but USE_BETA flag is unset"


Düğüm Sayısı: 3


* Cassandra tabloma veri yazarken tutarlılık seviyesini ayarını değiştirerek yazma işlemi performansı üzerinde kazanımlar elde edebiliriz.

* ONE: Veri yazma veya okuma işlemi için en az bir düğümde tamamlanmalıdır.
* LOCAL_QUORUM: Yerel bölge içerisindeki düğümlerin (rack) çoğunluğu tarafından onaylanmalıdır.
* QUORUM: Kümedeki düğümlerin (rack) çoğunluğu tarafından onaylanmalıdır.
* ALL: Tüm düğümler tarafından onaylanmalıdır.

In [None]:
##0:06:49.339959

In [91]:
#spark consistency level'ını ayarlamak için
spark.conf.set("spark.cassandra.output.consistency.level" ,"LOCAL_ONE")

In [92]:
from datetime import datetime

start_time = datetime.now()

df2.write.mode("append") \
    .format("org.apache.spark.sql.cassandra") \
    .saveAsTable("docker3nodescluster.iyzico.iyz_cas") 

end_time = datetime.now()

print(end_time - start_time)


0:05:56.732827


## Görev 6: Spark ile Cassandra Tablosunu Okuma

### Adım 1: Yazma işleminden sonra Spark ile Cassandra tablosunu okuyunuz.

* spark df'i cassandra verisini okurken tablonun metadatalarını getirmez örn; partition_key ya da cluster_key. Bunları terminalde cqlsh da kontrol edebiliriz.

In [99]:
cassandra_df = spark.read.table("docker3nodescluster.iyzico.iyz_cas")
#yukarıda oluşturduğumuz içi boş bu tabloya cassandradan veriyi spark ile alıp okuyalım
cassandra_df.limit(5).toPandas()

Unnamed: 0,trans_day,transaction_date,merchant_id,category,total_paid_price
0,2018-01-07,2018-01-07 23:58:33,46774,Emlak Ofisleri ve Yöneticileri - Kiralama,277.236
1,2018-01-07,2018-01-07 23:58:14,124381,Gündelik Eşya Mağazaları,368.478
2,2018-01-07,2018-01-07 23:58:13,124381,Gündelik Eşya Mağazaları,260.91
3,2018-01-07,2018-01-07 23:57:58,46774,Emlak Ofisleri ve Yöneticileri - Kiralama,466.344
4,2018-01-07,2018-01-07 23:57:02,124381,Gündelik Eşya Mağazaları,549.18


### Adım 2: 2018-01-17 tarihine ait kayıtları ekrana yazdırınız, kaç kayıt olduğunu bulunuz ve sorgunun süresini hesaplayınız.

Cassandra tablosunu oluştururken partition key olarak trans_day sütununu seçmemiz bu sorgu performansı kapsamında daha hızlı sonuç döndürecektir partiton key olmayan sorguların aksine 

In [107]:
cassandra_df.filter(cassandra_df.trans_day == "2018-01-17").limit(10).toPandas()

Unnamed: 0,trans_day,transaction_date,merchant_id,category,total_paid_price
0,2018-01-17,2018-01-17 23:59:11,57192,Bilgisayar Yazılım Mağazaları,194.184
1,2018-01-17,2018-01-17 23:59:01,42616,"Kadın,Erkek Giyim Magazaları",138.33
2,2018-01-17,2018-01-17 23:59:01,124381,Gündelik Eşya Mağazaları,2752.11
3,2018-01-17,2018-01-17 23:58:53,42616,"Kadın,Erkek Giyim Magazaları",138.33
4,2018-01-17,2018-01-17 23:58:33,124381,Gündelik Eşya Mağazaları,539.82
5,2018-01-17,2018-01-17 23:58:15,46774,Emlak Ofisleri ve Yöneticileri - Kiralama,160.326
6,2018-01-17,2018-01-17 23:57:53,57192,Bilgisayar Yazılım Mağazaları,9.792
7,2018-01-17,2018-01-17 23:57:52,57192,Bilgisayar Yazılım Mağazaları,79.992
8,2018-01-17,2018-01-17 23:57:47,124381,Gündelik Eşya Mağazaları,539.802
9,2018-01-17,2018-01-17 23:57:44,57192,Bilgisayar Yazılım Mağazaları,9.792


In [109]:
start_time = datetime.now()

cassandra_df.filter(cassandra_df.trans_day == "2018-01-17").count()

end_time = datetime.now()

print(cassandra_df.filter(cassandra_df.trans_day == "2018-01-17").count())
print(end_time - start_time)

3987
0:00:00.144610


### Adım 3: category='Bilgisayar Yazılım Mağazaları' olan kayıtları ekrana yazdırınız, kaç kayıt olduğunu bulunuz ve sorgunun süresini hesaplayınız.

In [110]:
cassandra_df.filter(cassandra_df.category == "Bilgisayar Yazılım Mağazaları").limit(10).toPandas()

Unnamed: 0,trans_day,transaction_date,merchant_id,category,total_paid_price
0,2020-01-22,2020-01-22 23:58:49,57192,Bilgisayar Yazılım Mağazaları,13.446
1,2020-01-22,2020-01-22 23:58:37,57192,Bilgisayar Yazılım Mağazaları,247.32
2,2020-01-22,2020-01-22 23:58:29,57192,Bilgisayar Yazılım Mağazaları,77.148
3,2020-01-22,2020-01-22 23:57:17,57192,Bilgisayar Yazılım Mağazaları,269.244
4,2020-01-22,2020-01-22 23:56:54,57192,Bilgisayar Yazılım Mağazaları,16.182
5,2020-01-22,2020-01-22 23:56:00,57192,Bilgisayar Yazılım Mağazaları,86.22
6,2020-01-22,2020-01-22 23:55:05,57192,Bilgisayar Yazılım Mağazaları,322.092
7,2020-01-22,2020-01-22 23:54:52,57192,Bilgisayar Yazılım Mağazaları,340.758
8,2020-01-22,2020-01-22 23:52:59,57192,Bilgisayar Yazılım Mağazaları,305.982
9,2020-01-22,2020-01-22 23:52:51,57192,Bilgisayar Yazılım Mağazaları,155.592


In [115]:
spark.sql("""

SELECT * FROM docker3nodescluster.iyzico.iyz_cas WHERE category = 'Bilgisayar Yazılım Mağazaları' 
""").show(5)

+----------+-------------------+-----------+--------------------+----------------+
| trans_day|   transaction_date|merchant_id|            category|total_paid_price|
+----------+-------------------+-----------+--------------------+----------------+
|2019-12-29|2019-12-29 23:59:44|      57192|Bilgisayar Yazılı...|         384.768|
|2019-12-29|2019-12-29 23:58:42|      57192|Bilgisayar Yazılı...|          54.036|
|2019-12-29|2019-12-29 23:56:37|      57192|Bilgisayar Yazılı...|          96.192|
|2019-12-29|2019-12-29 23:55:40|      57192|Bilgisayar Yazılı...|          54.036|
|2019-12-29|2019-12-29 23:54:43|      57192|Bilgisayar Yazılı...|           25.47|
+----------+-------------------+-----------+--------------------+----------------+
only showing top 5 rows



In [118]:
start_time = datetime.now()

cassandra_df.filter(cassandra_df.category == "Bilgisayar Yazılım Mağazaları").count()

end_time = datetime.now()

print(cassandra_df.filter(cassandra_df.category == "Bilgisayar Yazılım Mağazaları").count())
print(end_time - start_time)

1133405
0:00:20.199053


In [117]:
sorgu = spark.sql("""

SELECT * FROM docker3nodescluster.iyzico.iyz_cas WHERE category = 'Bilgisayar Yazılım Mağazaları' 
""")

start_time = datetime.now()

print(sorgu.count())

end_time = datetime.now()
print(end_time - start_time)

1133405
0:00:19.324734
