BÀI TEST DATA ENGINEER - NEXAR GAME

Giải nén thư mục

In [None]:
import zipfile
import os
import pandas as pd

# Đường dẫn file zip và thư mục giải nén
zip_path = 'drive-download-20250723T030341Z-1-001.zip'
extract_to = 'unzipped'

# Giải nén
os.makedirs(extract_to, exist_ok=True)
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_to)
print(" Giải nén xong!")

# Đọc các file JSON-ND
for filename in os.listdir(extract_to):
    if filename.endswith(('.json', '.jsonl', '.ndjson')):
        file_path = os.path.join(extract_to, filename)
        print(f"📄 Đọc file: {file_path}")
        df = pd.read_json(file_path, lines=True)
        print(df.head())


✅ Giải nén xong!


Ghi dữ liệu dạng json dump để xem dữ liệu

In [40]:
import pandas as pd
import gzip
import json

# Đọc dữ liệu từ file JSON-ND gzip
path = '/home/ittranphu/tienth/nexar/unzipped/event_dump_000000000069.json.gz'
df = pd.read_json(path, compression='gzip', lines=True)

# Chuyển DataFrame thành danh sách các dict (mỗi dòng là 1 record)
records = df.to_dict(orient='records')

# Ghi ra file JSON dạng chuẩn (dạng list các object)
output_path = '/home/ittranphu/tienth/nexar/output_dump69.json'
with open(output_path, 'w', encoding='utf-8') as f:
    json.dump(records, f, indent=2, ensure_ascii=False)

print(f"✅ Đã ghi {len(records)} bản ghi vào: {output_path}")


✅ Đã ghi 71273 bản ghi vào: /home/ittranphu/tienth/nexar/output_dump69.json


Làm phẳng dữ liệu thành dạng bảng và Ghi dữ liệu vào psql

In [None]:
#load cả thư mục
import gzip
import json
import os
from datetime import datetime
import pandas as pd
from sqlalchemy import create_engine

#  PostgreSQL connection
engine = create_engine('postgresql+psycopg2://tienth:tienth@localhost:5434/ke_toan_dw')

#  Path to folder
folder_path = '/home/ittranphu/tienth/nexar/unzipped'

def parse_event_params(params_list):
    flat = {}
    for param in params_list:
        key = param.get('key')
        val = param.get('value', {})
        if 'string_value' in val:
            flat[key] = val['string_value']
        elif 'int_value' in val:
            try:
                flat[key] = int(val['int_value'])
            except ValueError:
                flat[key] = val['int_value']
        elif 'float_value' in val:
            flat[key] = float(val['float_value'])
    return flat

def collect_all_keys_from_folder(folder_path, limit_per_file=100):
    event_keys = set()
    geo_keys = set()
    for fname in sorted(os.listdir(folder_path)):
        if not fname.endswith('.json.gz'):
            continue
        with gzip.open(os.path.join(folder_path, fname), 'rt', encoding='utf-8') as f:
            for i, line in enumerate(f):
                if i >= limit_per_file:
                    break
                record = json.loads(line)
                event_params = parse_event_params(record.get('event_params', []))
                event_keys.update(event_params.keys())
                geo = record.get('geo', {})
                geo_keys.update(geo.keys())
    return sorted(event_keys), sorted(geo_keys)

def process_record(record, event_keys, geo_keys):
    row = {}
    ts_micro = record.get('event_timestamp')
    ts = None
    if ts_micro:
        try:
            ts = datetime.utcfromtimestamp(int(ts_micro) / 1_000_000)
        except:
            ts = None

    row['event_date'] = datetime.strptime(str(record.get('event_date')), "%Y%m%d").date() if record.get('event_date') else None
    row['event_timestamp'] = ts
    row['event_name'] = record.get('event_name')
    row['user_id'] = record.get('user_id')

    geo = record.get('geo', {})
    for key in geo_keys:
        row[f'geo_{key}'] = geo.get(key)

    event_params = parse_event_params(record.get('event_params', []))
    for key in event_keys:
        row[key] = event_params.get(key)

    return row

def flatten_all_files_to_postgres(folder_path):
    print(" Scanning all files to collect keys...")
    event_keys, geo_keys = collect_all_keys_from_folder(folder_path)

    base_fields = ['event_date', 'event_timestamp', 'event_name', 'user_id']
    geo_fields = [f'geo_{k}' for k in geo_keys]
    all_fields = base_fields + geo_fields + event_keys

    for fname in sorted(os.listdir(folder_path)):
        if not fname.endswith('.json.gz'):
            continue
        full_path = os.path.join(folder_path, fname)
        print(f" Processing file: {fname}")
        rows = []
        with gzip.open(full_path, 'rt', encoding='utf-8') as fin:
            for i, line in enumerate(fin):
                try:
                    record = json.loads(line)
                    row = process_record(record, event_keys, geo_keys)
                    rows.append(row)
                except Exception as e:
                    print(f" Error in line {i} of {fname}: {e}")
                    continue

                if len(rows) >= 1000:
                    df = pd.DataFrame(rows)
                    df.to_sql("event_logs_flat", engine, if_exists='append', index=False)
                    print(f" Inserted 1000 rows from {fname}")
                    rows = []

            if rows:
                df = pd.DataFrame(rows)
                df.to_sql("event_logs_flat", engine, if_exists='append', index=False)
                print(f" Inserted {len(rows)} final rows from {fname}")

#  Run
if __name__ == "__main__":
    flatten_all_files_to_postgres(folder_path)


📌 Scanning all files to collect keys...
🚀 Processing file: event_dump_000000000000.json.gz


  ts = datetime.utcfromtimestamp(int(ts_micro) / 1_000_000)


✅ Inserted 1000 rows from event_dump_000000000000.json.gz
✅ Inserted 1000 rows from event_dump_000000000000.json.gz
✅ Inserted 1000 rows from event_dump_000000000000.json.gz
✅ Inserted 1000 rows from event_dump_000000000000.json.gz
✅ Inserted 1000 rows from event_dump_000000000000.json.gz
✅ Inserted 1000 rows from event_dump_000000000000.json.gz
✅ Inserted 1000 rows from event_dump_000000000000.json.gz
✅ Inserted 1000 rows from event_dump_000000000000.json.gz
✅ Inserted 1000 rows from event_dump_000000000000.json.gz
✅ Inserted 1000 rows from event_dump_000000000000.json.gz
✅ Inserted 1000 rows from event_dump_000000000000.json.gz
✅ Inserted 1000 rows from event_dump_000000000000.json.gz
✅ Inserted 1000 rows from event_dump_000000000000.json.gz
✅ Inserted 1000 rows from event_dump_000000000000.json.gz
✅ Inserted 1000 rows from event_dump_000000000000.json.gz
✅ Inserted 1000 rows from event_dump_000000000000.json.gz
✅ Inserted 1000 rows from event_dump_000000000000.json.gz
✅ Inserted 100

a.Tính tỉ lệ người chơi thắng ở các level 1, 5, 10 cho toàn bộ user

Tỉ lệ người chơi thắng ở level 1 = 1 - (tỉ lệ người chơi thua ở level 1)

A = Số lần người chơi thua ở level 10.

B = Số lần người chơi chơi ở level 10 hoặc cao hơn, và có result rõ ràng (win hoặc lose).  những người đã ít nhất hoàn thành level 10.

Tính tỉ lệ thắng = 1 - (số lần thua ở level 10 / tổng số lần chơi level ≥10)

In [25]:
# test với 1 file dữ liệu
import pandas as pd
from pandasql import sqldf

# Đọc dữ liệu từ CSV
df = pd.read_csv('/home/ittranphu/tienth/nexar/flattened_full_output.csv')

# Hàm thực thi SQL
pysqldf = lambda q: sqldf(q, globals())

# Truy vấn lọc bản ghi
query = """
SELECT
    'level_1' AS level_label,
    (SELECT COUNT(*) FROM df WHERE level = 1 AND result = 'lose') AS lose_at_level,
    (SELECT COUNT(*) FROM df WHERE level >= 1 AND result IS NOT NULL) AS total_after_level,
    CASE
        WHEN (SELECT COUNT(*) FROM df WHERE level >= 1 AND result IS NOT NULL) = 0 THEN 0
        ELSE 1.0 - 1.0 * 
            (SELECT COUNT(*) FROM df WHERE level = 1 AND result = 'lose') /
            (SELECT COUNT(*) FROM df WHERE level >= 1 AND result IS NOT NULL)
    END AS win_rate
UNION ALL
SELECT
    'level_5' AS level_label,
    (SELECT COUNT(*) FROM df WHERE level = 5 AND result = 'lose'),
    (SELECT COUNT(*) FROM df WHERE level >= 5 AND result IS NOT NULL),
    CASE
        WHEN (SELECT COUNT(*) FROM df WHERE level >= 5 AND result IS NOT NULL) = 0 THEN 0
        ELSE 1.0 - 1.0 * 
            (SELECT COUNT(*) FROM df WHERE level = 5 AND result = 'lose') /
            (SELECT COUNT(*) FROM df WHERE level >= 5 AND result IS NOT NULL)
    END
UNION ALL
SELECT
    'level_10' AS level_label,
    (SELECT COUNT(*) FROM df WHERE level = 10 AND result = 'lose'),
    (SELECT COUNT(*) FROM df WHERE level >= 10 AND result IS NOT NULL),
    CASE
        WHEN (SELECT COUNT(*) FROM df WHERE level >= 10 AND result IS NOT NULL) = 0 THEN 0
        ELSE 1.0 - 1.0 * 
            (SELECT COUNT(*) FROM df WHERE level = 10 AND result = 'lose') /
            (SELECT COUNT(*) FROM df WHERE level >= 10 AND result IS NOT NULL)
    END

"""

# Thực thi
filtered_df = pysqldf(query)

# In kết quả
print(filtered_df)



  level_label  lose_at_level  total_after_level  win_rate
0     level_1              0              34510   1.00000
1     level_5             10              32237   0.99969
2    level_10            143              29852   0.99521


In [8]:
import pandas as pd
from sqlalchemy import create_engine

# Kết nối đến PostgreSQL
engine = create_engine('postgresql+psycopg2://tienth:tienth@localhost:5434/ke_toan_dw')

# Câu SQL: tính tỉ lệ chiến thắng ở các level 1, 5, 10
sql_query = """
SELECT
    'level_1' AS level_label,
    (SELECT COUNT(*) FROM event_logs_flat WHERE level = 1 AND result = 'lose') AS lose_at_level,
    (SELECT COUNT(*) FROM event_logs_flat WHERE level >= 1 AND result IS NOT NULL) AS total_after_level,
    CASE
        WHEN (SELECT COUNT(*) FROM event_logs_flat WHERE level >= 1 AND result IS NOT NULL) = 0 THEN 0
        ELSE 1.0 - 1.0 * 
            (SELECT COUNT(*) FROM event_logs_flat WHERE level = 1 AND result = 'lose') /
            (SELECT COUNT(*) FROM event_logs_flat WHERE level >= 1 AND result IS NOT NULL)
    END AS win_rate
UNION ALL
SELECT
    'level_5' AS level_label,
    (SELECT COUNT(*) FROM event_logs_flat WHERE level = 5 AND result = 'lose'),
    (SELECT COUNT(*) FROM event_logs_flat WHERE level >= 5 AND result IS NOT NULL),
    CASE
        WHEN (SELECT COUNT(*) FROM event_logs_flat WHERE level >= 5 AND result IS NOT NULL) = 0 THEN 0
        ELSE 1.0 - 1.0 * 
            (SELECT COUNT(*) FROM event_logs_flat WHERE level = 5 AND result = 'lose') /
            (SELECT COUNT(*) FROM event_logs_flat WHERE level >= 5 AND result IS NOT NULL)
    END
UNION ALL
SELECT
    'level_10' AS level_label,
    (SELECT COUNT(*) FROM event_logs_flat WHERE level = 10 AND result = 'lose'),
    (SELECT COUNT(*) FROM event_logs_flat WHERE level >= 10 AND result IS NOT NULL),
    CASE
        WHEN (SELECT COUNT(*) FROM event_logs_flat WHERE level >= 10 AND result IS NOT NULL) = 0 THEN 0
        ELSE 1.0 - 1.0 * 
            (SELECT COUNT(*) FROM event_logs_flat WHERE level = 10 AND result = 'lose') /
            (SELECT COUNT(*) FROM event_logs_flat WHERE level >= 10 AND result IS NOT NULL)
    END;
"""

# Thực thi SQL và hiển thị kết quả
ket_qua = pd.read_sql_query(sql_query, con=engine)
print(ket_qua)


  level_label  lose_at_level  total_after_level  win_rate
0     level_1              0            2421097  1.000000
1     level_5            878            2265493  0.999612
2    level_10          10734            2095775  0.994878


Kết quả : 

level 1, 100% người chơi thắng

level 5: 99,9612% người chơi thắng

level 10 : 99,4878%

b.Tính tỉ lệ sử dụng skill trung bình trong 1 ván chơi của những user ở brazil

logic xử lý:

Lọc bản ghi có:

event_name = 'use_skill'

geo.country = 'Brazil'

Mỗi bản ghi event_name = 'use_skill' tương ứng với 1 lần dùng skill --> count(*) trên tập dữ liệu đã lọc là tổng số lần dùng skill

Mỗi session được nhận diện bởi cặp: user_id + ga_session_id --> → Lấy tập các (user_id, ga_session_id) duy nhất → đếm số lượng → số session

--> tỉ lệ sử dụng skill trung bình trên mỗi session:

In [None]:


# query = """ SELECT COUNT(*) AS brazil_user_records FROM df WHERE geo_country = 'Brazil';"""
# result = pysqldf(query)

# query2 = """ SELECT COUNT(*) AS brazil_use_skill_count FROM df WHERE event_name = 'use_skill' AND geo_country = 'Brazil'; """
# result2 = pysqldf(query2)



   brazil_user_records
0                11023
   brazil_use_skill_count
0                     399


In [None]:
import pandas as pd
from sqlalchemy import create_engine

# Kết nối tới PostgreSQL
engine = create_engine('postgresql+psycopg2://tienth:tienth@localhost:5434/ke_toan_dw')

# Truy vấn SQL: tính tỷ lệ sử dụng skill trung bình trên mỗi session của user ở Brazil
sql_query = """
SELECT
    COUNT(*) * 1.0 / COUNT(DISTINCT user_id || '_' || ga_session_id) AS avg_skill_usage_per_session,
    COUNT(*) AS total_skill_usage_events,
    COUNT(DISTINCT user_id || '_' || ga_session_id) AS total_sessions
FROM event_logs_flat
WHERE event_name = 'use_skill'
  AND geo_country = 'Brazil'
  AND ga_session_id IS NOT NULL
"""

# Thực thi và hiển thị kết quả
df2 = pd.read_sql_query(sql_query, con=engine)
print(df2)


   avg_skill_usage_per_session  total_skill_usage_events  total_sessions
0                     2.471935                     27216           11010


Tỉ lệ sử dụng skill trung bình trong 1 ván chơi của những user ở brazil là 2.47

C. Tìm tỉ lệ user còn ở lại chơi game qua từng level

Với mỗi level, đếm số lượng DISTINCT user_id đã bắt đầu level đó (level_start)

Đếm số lượng user duy nhất ở mỗi level (tức là user còn ở lại đến level đó).

Tính tỷ lệ user còn lại tại level N so với số lượng user bắt đầu tại level 1 (coi level 1 là điểm gốc).

Tính retention rate qua từng level:

In [27]:
import pandas as pd
from sqlalchemy import create_engine

# Kết nối đến PostgreSQL
engine = create_engine('postgresql+psycopg2://tienth:tienth@localhost:5434/ke_toan_dw')

# Câu truy vấn SQL tính retention rate qua từng level
sql_query = """
WITH users_per_level AS (
    SELECT
        level::int AS level,
        COUNT(DISTINCT user_id) AS user_count
    FROM event_logs_flat
    WHERE event_name = 'level_start'
      AND level IS NOT NULL
    GROUP BY level::int
),
first_level AS (
    SELECT user_count AS total_users_level_1
    FROM users_per_level
    WHERE level = 1
)

SELECT
    upl.level,
    upl.user_count,
    ROUND(upl.user_count * 1.0 / fl.total_users_level_1, 4) AS retention_rate
FROM users_per_level upl
JOIN first_level fl ON TRUE
ORDER BY upl.level;
"""

# Thực thi câu truy vấn và đọc kết quả vào DataFrame
df = pd.read_sql_query(sql_query, engine)

# In kết quả
print(df)

# (Tùy chọn) Vẽ biểu đồ retention nếu muốn
# import matplotlib.pyplot as plt
# plt.plot(df['level'], df['retention_rate'], marker='o')
# plt.title('User Retention Rate by Level')
# plt.xlabel('Level')
# plt.ylabel('Retention Rate')
# plt.grid(True)
# plt.show()


      level  user_count  retention_rate
0         1       40914          1.0000
1         2       35673          0.8719
2         3       33199          0.8114
3         4       29754          0.7272
4         5       26888          0.6572
...     ...         ...             ...
3466  10746           1          0.0000
3467  10747           1          0.0000
3468  10748           1          0.0000
3469  10749           1          0.0000
3470  15401           1          0.0000

[3471 rows x 3 columns]
