In [42]:
import duckdb
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name,explode,split,col, concat_ws


In [9]:
data = [
    [1 ,1323, {'satisfaction': 9, 'pain': 2, 'fatigue': 2}, '2020-06-25'],
    [2 ,9032,{'satisfaction': 2, 'pain': 7, 'fatigue': 5}, '2020-06-30'],
    [3 ,2331, {'satisfaction': 7, 'pain': 1, 'fatigue': 1}, '2020-07-05'],
    [4 ,2303, {'satisfaction': 8, 'pain': 9, 'fatigue': 0}, '2020-07-12'],
    [5 ,1323, {'satisfaction': 10, 'pain': 0, 'fatigue': 0}, '2020-07-09'],
    [6 ,2331, {'satisfaction': 8, 'pain': 9, 'fatigue': 5}, '2020-07-20'],
]

df = pd.DataFrame(data, columns=['id', 'patient_id', 'scores', 'date'])
df['date'] = pd.to_datetime(df['date'])

In [10]:
with duckdb.connect('sword.db') as con:
    con.sql('CREATE TABLE scores AS SELECT * FROM df')


In [11]:
with duckdb.connect('sword.db') as con:
    con.table('scores').show()

┌───────┬────────────┬─────────────────────────────────────────────────────────────┬─────────────────────┐
│  id   │ patient_id │                           scores                            │        date         │
│ int64 │   int64    │ struct(satisfaction integer, pain integer, fatigue integer) │    timestamp_ns     │
├───────┼────────────┼─────────────────────────────────────────────────────────────┼─────────────────────┤
│     1 │       1323 │ {'satisfaction': 9, 'pain': 2, 'fatigue': 2}                │ 2020-06-25 00:00:00 │
│     2 │       9032 │ {'satisfaction': 2, 'pain': 7, 'fatigue': 5}                │ 2020-06-30 00:00:00 │
│     3 │       2331 │ {'satisfaction': 7, 'pain': 1, 'fatigue': 1}                │ 2020-07-05 00:00:00 │
│     4 │       2303 │ {'satisfaction': 8, 'pain': 9, 'fatigue': 0}                │ 2020-07-12 00:00:00 │
│     5 │       1323 │ {'satisfaction': 10, 'pain': 0, 'fatigue': 0}               │ 2020-07-09 00:00:00 │
│     6 │       2331 │ {'satisfaction

In [22]:
with duckdb.connect('sword.db') as con:
    con.sql("""
                WITH unnested as (
                    SELECT 
                    id
                    ,patient_id
                    ,date
                    ,MONTHNAME(date) as month
                    ,YEAR(date) as year
                    ,CASE WHEN scores.satisfaction >= 8 THEN 'Promoter'
                        ELSE 'Detractor'
                    END as NPS
                    FROM scores),
                grouped as (SELECT
                    year
                    ,month
                    ,COUNT(CASE WHEN NPS = 'Promoter' THEN 1 END) as Promoters
                    ,COUNT(CASE WHEN NPS = 'Detractor' THEN 1 END) as Detractors
                    ,COUNT(*) as Total
                    FROM unnested
                    GROUP BY year, month)
                SELECT
                    year
                    ,month
                    ,(Promoters - Detractors) / Total as NPS
                FROM grouped
                    """
                ).show()

┌───────┬─────────┬────────┐
│ year  │  month  │  NPS   │
│ int64 │ varchar │ double │
├───────┼─────────┼────────┤
│  2020 │ June    │    0.0 │
│  2020 │ July    │    0.5 │
└───────┴─────────┴────────┘



In [26]:
spark = SparkSession.builder \
      .master("local[10]") \
      .appName("sword-health") \
      .getOrCreate() 

24/08/26 03:50:20 WARN Utils: Your hostname, Gustavos-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.22 instead (on interface en0)
24/08/26 03:50:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/26 03:50:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# [Dataset Source](https://huggingface.co/datasets/hakatiki/guttenberg-books-corpus)

In [33]:
#Dataset source https://huggingface.co/datasets/hakatiki/guttenberg-books-corpus
path = './books/'
df = spark.read.option("recursiveFileLookup", "true").text(path)
df = df.withColumn("filename", input_file_name())




In [45]:
df_count=(
  df.withColumn('word', explode(split(col('value'), ' ')))
    .groupBy('word')
    .count()
    .sort('count', ascending=False)
    .withColumn('text', concat_ws(' = ', col('word'), col('count')))
)
df_count.show()




+----+-------+-------------+
|word|  count|         text|
+----+-------+-------------+
|   a|1388639|  a = 1388639|
|    |1181349|    = 1181349|
|  az| 436808|  az = 436808|
|   –| 344590|   – = 344590|
|hogy| 285508|hogy = 285508|
| nem| 233974| nem = 233974|
|   s| 231370|   s = 231370|
|  és| 205993|  és = 205993|
|   A| 178977|   A = 178977|
|  is| 158089|  is = 158089|
| egy| 155425| egy = 155425|
|mint|  84881| mint = 84881|
|volt|  84663| volt = 84663|
| meg|  82173|  meg = 82173|
|csak|  81947| csak = 81947|
|  ki|  75033|   ki = 75033|
| azt|  74088|  azt = 74088|
|  de|  67948|   de = 67948|
| még|  67495|  még = 67495|
|  Az|  66971|   Az = 66971|
+----+-------+-------------+
only showing top 20 rows



                                                                                

In [48]:
#df_count.select('text').repartition(1).write.text('output')
df_count.select('text').toPandas().to_csv('output.txt', index=False)

                                                                                