In [2]:
from pyspark.sql import SparkSession

# SparkSession oluştur
spark = SparkSession.builder \
    .appName("CSV_Reader") \
    .getOrCreate()

# CSV dosyasını oku, noktalı virgül ayırıcısını kullanarak
df = spark.read.option("header", "true") \
    .option("delimiter", ";") \
    .option("inferSchema", "true") \
    .csv("/home/jovyan/work/MarketSales.csv")

# Dataframe'in ilk birkaç satırını göster
df.show()



+-----+--------+--------------------+-------+---------------+------+-----+------------+-------+--------+----------------+------------------+---------+-----------------+--------+---------+----------+----------------+---------+--------------+---------------+---------------+------------------+---------------+---------------+------+
|   ID|ITEMCODE|            ITEMNAME|FICHENO|          DATE_|AMOUNT|PRICE|LINENETTOTAL|LINENET|BRANCHNR|          BRANCH|          SALESMAN|     CITY|           REGION|LATITUDE|LONGITUDE|CLIENTCODE|      CLIENTNAME|BRANDCODE|         BRAND| CATEGORY_NAME1| CATEGORY_NAME2|    CATEGORY_NAME3|      STARTDATE|        ENDDATE|GENDER|
+-----+--------+--------------------+-------+---------------+------+-----+------------+-------+--------+----------------+------------------+---------+-----------------+--------+---------+----------+----------------+---------+--------------+---------------+---------------+------------------+---------------+---------------+------+
|11738|

In [3]:
df.select("DATE_").show(5)

+---------------+
|          DATE_|
+---------------+
|7.01.2017 00:00|
|6.01.2017 00:00|
|3.01.2017 00:00|
|3.01.2017 00:00|
|5.01.2017 00:00|
+---------------+
only showing top 5 rows



In [17]:
from pyspark.sql.functions import year, month, sum, to_timestamp

# Tarihi doğru formata dönüştür ve yıl/ay analizini yap
sales_trend = df.withColumn("DATE_FORMATTED", to_timestamp("DATE_", "d.MM.yyyy HH:mm")) \
                .withColumn("year", year("DATE_FORMATTED")) \
                .withColumn("month", month("DATE_FORMATTED")) \
                .groupBy("year", "month") \
                .agg(sum("LINENETTOTAL").alias("total_sales")) \
                .orderBy("year", "month")

sales_trend.show()

+----+-----+-----------+
|year|month|total_sales|
+----+-----+-----------+
|NULL| NULL|       NULL|
|2017|    1|    88857.0|
|2017|    2|    91138.0|
|2017|    3|   112005.0|
+----+-----+-----------+



In [6]:
from pyspark.sql.functions import year, month, sum, to_timestamp, datediff, max, count

# Tarihi doğru formata dönüştür
df_formatted = df.withColumn("DATE_FORMATTED", to_timestamp("DATE_", "d.MM.yyyy HH:mm"))

# Yıl ve aya göre toplam satışları hesapla
sales_trend = df_formatted.withColumn("year", year("DATE_FORMATTED")) \
                          .withColumn("month", month("DATE_FORMATTED")) \
                          .groupBy("year", "month") \
                          .agg(sum("LINENETTOTAL").alias("total_sales")) \
                          .orderBy("year", "month")

sales_trend.show()

+----+-----+-----------+
|year|month|total_sales|
+----+-----+-----------+
|NULL| NULL|       NULL|
|2017|    1|    88857.0|
|2017|    2|    91138.0|
|2017|    3|   112005.0|
+----+-----+-----------+



In [7]:
from pyspark.sql.functions import year, month, sum, to_timestamp, datediff, max, count, current_date, lit


# Tarihi doğru formata dönüştür
df_formatted = df.withColumn("DATE_FORMATTED", to_timestamp("DATE_", "d.MM.yyyy HH:mm"))

# Yıl ve aya göre toplam satışları hesapla
sales_trend = df_formatted.withColumn("year", year("DATE_FORMATTED")) \
                          .withColumn("month", month("DATE_FORMATTED")) \
                          .groupBy("year", "month") \
                          .agg(sum("LINENETTOTAL").alias("total_sales")) \
                          .orderBy("year", "month")

sales_trend.show()

# current_date'i bir Spark Column olarak al
current_date_col = current_date()

# RFM hesaplamasında bu Column'u kullan
rfm = df_formatted.withColumn("recency", datediff(current_date_col, "DATE_FORMATTED"))

# Diğer işlemler...
current_date_value = df_formatted.agg(max("DATE_FORMATTED")).collect()[0][0]

rfm = df_formatted.groupBy("CLIENTCODE") \
    .agg(
        datediff(lit(current_date_value), max("DATE_FORMATTED")).alias("recency"),
        count("FICHENO").alias("frequency"),
        sum("LINENETTOTAL").alias("monetary")
    )

rfm.show()

+----+-----+-----------+
|year|month|total_sales|
+----+-----+-----------+
|NULL| NULL|       NULL|
|2017|    1|    88857.0|
|2017|    2|    91138.0|
|2017|    3|   112005.0|
+----+-----+-----------+

+----------+-------+---------+--------+
|CLIENTCODE|recency|frequency|monetary|
+----------+-------+---------+--------+
|   1093856|     27|       39|    14.0|
|    536646|     22|       22|    NULL|
|    869396|     11|       31|    13.0|
|    924386|      1|       47|     8.0|
|    983041|     11|       15|     8.0|
|     17506|     42|       14|    10.0|
|    195395|      4|       52|    NULL|
|    164951|     36|        7|     3.0|
|    379975|      4|       16|     9.0|
|   1055537|      8|        8|     7.0|
|   1080760|     54|        8|     1.0|
|   1005313|     57|       48|    11.0|
|   1052349|     14|       70|    14.0|
|    350582|     36|        4|    NULL|
|    327859|     61|        7|    NULL|
|   1028795|      1|       22|    18.0|
|   1068016|     46|       23|     7.0|

In [1]:
print("")




In [8]:

from pyspark.sql.functions import col

# Mağaza bazında satış adedi ve satış miktarını hesapla
top_stores = df_formatted.groupBy("BRANCH") \
                         .agg(
                             count("FICHENO").alias("total_sales_count"),
                             sum("LINENETTOTAL").alias("total_sales_amount")
                         ) \
                         .orderBy(col("total_sales_amount").desc())

top_stores.show()


+-----------------+-----------------+------------------+
|           BRANCH|total_sales_count|total_sales_amount|
+-----------------+-----------------+------------------+
|  İstanbul Subesi|           113620|           52096.0|
|    Ankara Subesi|            40694|           21048.0|
|     İzmir Subesi|            32326|           13859.0|
|   Antalya Subesi|            17371|           12526.0|
|Diyarbakır Subesi|            12384|            9528.0|
| Gümüşhane Subesi|             1187|            9029.0|
|     Bursa Subesi|            21215|            8773.0|
|     Adana Subesi|            15861|            7216.0|
|     Konya Subesi|            15886|            6989.0|
| Zonguldak Subesi|            14387|            6731.0|
|   Kocaeli Subesi|            14292|            6629.0|
| Şanlıurfa Subesi|            14824|            6474.0|
|    Mersin Subesi|            12513|            6118.0|
| Gaziantep Subesi|            14445|            5869.0|
|     Hatay Subesi|            

#### En Popüler Ürünler

In [10]:
# En popüler ürünleri belirle
popular_products = df_formatted.groupBy("ITEMNAME") \
                               .agg(
                                   count("FICHENO").alias("total_sales_count"),
                                   sum("LINENETTOTAL").alias("total_sales_amount")
                               ) \
                               .orderBy(col("total_sales_count").desc())

popular_products.show()

+--------------------+-----------------+------------------+
|            ITEMNAME|total_sales_count|total_sales_amount|
+--------------------+-----------------+------------------+
|        EKMEK 250 GR|             9122|             294.0|
|                NULL|             7167|            5972.0|
|             DOMATES|             6018|             330.0|
|             PATATES|             5718|             360.0|
|            PORTAKAL|             5611|             248.0|
|           TOZ SEKER|             5242|            8984.0|
|               SOGAN|             4631|              68.0|
| OSMANCIK PIRINC KG.|             4101|            1218.0|
|               LIMON|             4093|              87.0|
|                 MUZ|             3934|             176.0|
|            KIVIRCIK|             3925|             701.0|
|            MAYDANOZ|             3808|             579.0|
|               HAVUC|             3632|              69.0|
|SİHİRLİ ELLER CİG...|             3566|

#### 2. Bölgesel Satış Analizi

In [9]:
# Bölgesel satış analizini yap
regional_sales = df_formatted.groupBy("REGION") \
                             .agg(
                                 count("FICHENO").alias("total_sales_count"),
                                 sum("LINENETTOTAL").alias("total_sales_amount")
                             ) \
                             .orderBy(col("total_sales_amount").desc())

regional_sales.show()

+-----------------+-----------------+------------------+
|           REGION|total_sales_count|total_sales_amount|
+-----------------+-----------------+------------------+
|          Marmara|           175658|           79029.0|
|       İç Anadolu|            96121|           45029.0|
|          Akdeniz|            75735|           39447.0|
|        Karadeniz|            67629|           39253.0|
|              Ege|            87170|           38200.0|
|Güneydoğu Anadolu|            63848|           31434.0|
|     Doğu Anadolu|            44938|           19608.0|
|             NULL|                1|              NULL|
+-----------------+-----------------+------------------+



#### Müşteri Segmentasyonu

In [11]:
# Müşterileri segmentlere ayır
customer_segments = rfm.withColumn("segment",
                                   when(col("recency") <= 30, "New")
                                   .when((col("recency") > 30) & (col("recency") <= 90), "Active")
                                   .otherwise("Inactive"))

customer_segments.show()

NameError: name 'when' is not defined

#### Satış Eğilimleri

In [12]:
# Zaman içindeki satış eğilimlerini analiz et
sales_trend = df_formatted.withColumn("DATE_FORMATTED", to_timestamp("DATE_", "d.MM.yyyy HH:mm")) \
                          .withColumn("year", year("DATE_FORMATTED")) \
                          .withColumn("month", month("DATE_FORMATTED")) \
                          .groupBy("year", "month") \
                          .agg(sum("LINENETTOTAL").alias("total_sales")) \
                          .orderBy("year", "month")

sales_trend.show()

+----+-----+-----------+
|year|month|total_sales|
+----+-----+-----------+
|NULL| NULL|       NULL|
|2017|    1|    88857.0|
|2017|    2|    91138.0|
|2017|    3|   112005.0|
+----+-----+-----------+



In [13]:
from pyspark.sql.functions import dayofyear

# Tarihi doğru formata dönüştür
df_formatted = df.withColumn("DATE_FORMATTED", to_timestamp("DATE_", "d.MM.yyyy HH:mm"))

# Yıla göre en çok satış yapılan günleri belirle
top_sales_days = df_formatted.withColumn("year", year("DATE_FORMATTED")) \
                             .withColumn("day_of_year", dayofyear("DATE_FORMATTED")) \
                             .groupBy("year", "day_of_year") \
                             .agg(sum("LINENETTOTAL").alias("total_sales")) \
                             .orderBy(col("total_sales").desc())

top_sales_days.show()

+----+-----------+-----------+
|year|day_of_year|total_sales|
+----+-----------+-----------+
|2017|         83|    11519.0|
|2017|         35|     9186.0|
|2017|         27|     6492.0|
|2017|         10|     6021.0|
|2017|         61|     5243.0|
|2017|         68|     4324.0|
|2017|         85|     4114.0|
|2017|         79|     3985.0|
|2017|         77|     3977.0|
|2017|         12|     3972.0|
|2017|         17|     3950.0|
|2017|         73|     3889.0|
|2017|         30|     3864.0|
|2017|         19|     3862.0|
|2017|         82|     3840.0|
|2017|         56|     3839.0|
|2017|         87|     3831.0|
|2017|         80|     3790.0|
|2017|         90|     3720.0|
|2017|         38|     3701.0|
+----+-----------+-----------+
only showing top 20 rows



#### En Popüler Ürünler