In [None]:
"""
LinkedIn Parquet Dataset Analysis Script
Analyzes 15.2GB parquet file with 20M rows for Semantic Talent Finder project

This script processes large parquet files in chunks to:
1. Extract schema and data type information
2. Analyze data quality and completeness
3. Generate insights for Java model optimization
4. Provide database schema recommendations
5. Configure processing pipeline parameters
"""

import pandas as pd
import numpy as np
import pyarrow.parquet as pq
import json
import os
from collections import Counter, defaultdict
from datetime import datetime
import gc
import psutil

# Configuration
PARQUET_FILE = "/Users/chromatrical/CAREER/Local Linkedin DB/DataBase/USA_filtered.parquet"
CHUNK_SIZE = 50000  # Process 50k rows at a time to manage memory
OUTPUT_DIR = "/Users/chromatrical/CAREER/Side Projects/semantic-talent-finder/data/analysis_output"

print("🚀 LinkedIn Parquet Dataset Analysis - Starting Setup...")
print(f"📁 Target File: {PARQUET_FILE}")
print(f"⭐ Chunk Size: {CHUNK_SIZE:,} rows")
print(f"💾 Output Directory: {OUTPUT_DIR}")

# Create output directory
os.makedirs(OUTPUT_DIR, exist_ok=True)
print(f"✅ Setup Complete - Output directory created at {OUTPUT_DIR}")

In [None]:
class LinkedInDataAnalyzer:
    def __init__(self, parquet_file_path, chunk_size=50000):
        self.parquet_file = parquet_file_path
        self.chunk_size = chunk_size
        self.insights = {
            'schema_analysis': {},
            'data_quality': {},
            'content_analysis': {},
            'business_logic': {},
            'processing_recommendations': {},
            'database_schema': {}
        }
        
        # Ensure output directory exists
        os.makedirs(OUTPUT_DIR, exist_ok=True)
        
    def get_memory_usage(self):
        """Monitor memory usage during processing"""
        process = psutil.Process(os.getpid())
        return process.memory_info().rss / 1024 / 1024  # MB
    
    def analyze_parquet_schema(self):
        """Analyze parquet file schema and metadata"""
        print("🔍 Analyzing Parquet Schema...")
        
        try:
            # Read parquet metadata without loading data
            parquet_file = pq.ParquetFile(self.parquet_file)
            schema = parquet_file.schema_arrow
            metadata = parquet_file.metadata
            
            # Extract schema information
            schema_info = {}
            for i, field in enumerate(schema):
                schema_info[field.name] = {
                    'type': str(field.type),
                    'nullable': field.nullable,
                    'index': i
                }
            
            self.insights['schema_analysis'] = {
                'total_columns': len(schema),
                'total_rows': metadata.num_rows,
                'file_size_gb': round(os.path.getsize(self.parquet_file) / (1024**3), 2),
                'columns': schema_info,
                'column_names': [field.name for field in schema]
            }
            
            print(f"✅ Schema Analysis Complete:")
            print(f"   - Total Rows: {metadata.num_rows:,}")
            print(f"   - Total Columns: {len(schema)}")
            print(f"   - File Size: {self.insights['schema_analysis']['file_size_gb']} GB")
            print(f"   - Columns: {', '.join(list(schema_info.keys())[:10])}...")
            
        except Exception as e:
            print(f"❌ Schema analysis failed: {e}")

# Initialize analyzer
analyzer = LinkedInDataAnalyzer(PARQUET_FILE, CHUNK_SIZE)
print("📊 LinkedInDataAnalyzer initialized successfully")

In [None]:
# Run schema analysis
analyzer.analyze_parquet_schema()

# Display schema results
print("\n📋 Schema Summary:")
schema_info = analyzer.insights['schema_analysis']
print(f"Total Rows: {schema_info.get('total_rows', 0):,}")
print(f"Total Columns: {schema_info.get('total_columns', 0)}")
print(f"File Size: {schema_info.get('file_size_gb', 0)} GB")

print("\n🔍 Column Overview:")
columns = schema_info.get('columns', {})
for i, (col_name, col_info) in enumerate(list(columns.items())[:15]):  # Show first 15 columns
    nullable = "nullable" if col_info.get('nullable', True) else "not null"
    print(f"  {i+1:2d}. {col_name:<30} | {col_info.get('type', 'unknown'):<15} | {nullable}")

if len(columns) > 15:
    print(f"  ... and {len(columns) - 15} more columns")

In [None]:
def analyze_data_quality_chunked(self):
    """Analyze data quality in chunks to handle large file"""
    print("\n🔍 Analyzing Data Quality in Chunks...")
    
    # Initialize aggregators
    null_counts = defaultdict(int)
    total_counts = defaultdict(int)
    data_types = {}
    
    chunk_count = 0
    total_rows_processed = 0
    
    try:
        # Process file in chunks
        parquet_file = pq.ParquetFile(self.parquet_file)
        
        for batch in parquet_file.iter_batches(batch_size=self.chunk_size):
            chunk_df = batch.to_pandas()
            chunk_count += 1
            total_rows_processed += len(chunk_df)
            
            # Analyze each column
            for column in chunk_df.columns:
                # Count nulls
                null_counts[column] += chunk_df[column].isnull().sum()
                total_counts[column] += len(chunk_df)
                
                # Store data type
                if column not in data_types:
                    data_types[column] = str(chunk_df[column].dtype)
            
            # Memory management
            del chunk_df
            gc.collect()
            
            if chunk_count % 50 == 0:
                print(f"   Processed {chunk_count} chunks ({total_rows_processed:,} rows)")
                print(f"   Memory usage: {self.get_memory_usage():.2f} MB")
            
            # Limit analysis for demo - analyze first 200k rows
            if chunk_count >= 4:
                break
        
        # Calculate null percentages
        null_percentages = {}
        for column in null_counts:
            null_percentages[column] = round((null_counts[column] / total_counts[column]) * 100, 2)
        
        self.insights['data_quality'] = {
            'total_rows_analyzed': total_rows_processed,
            'chunks_processed': chunk_count,
            'null_counts': dict(null_counts),
            'null_percentages': null_percentages,
            'data_types': data_types,
            'completeness_summary': {
                'high_quality_fields': [col for col, pct in null_percentages.items() if pct < 5],
                'medium_quality_fields': [col for col, pct in null_percentages.items() if 5 <= pct < 25],
                'low_quality_fields': [col for col, pct in null_percentages.items() if pct >= 25]
            }
        }
        
        print(f"✅ Data Quality Analysis Complete:")
        print(f"   - Rows Analyzed: {total_rows_processed:,}")
        print(f"   - High Quality Fields: {len(self.insights['data_quality']['completeness_summary']['high_quality_fields'])}")
        print(f"   - Low Quality Fields: {len(self.insights['data_quality']['completeness_summary']['low_quality_fields'])}")
        
    except Exception as e:
        print(f"❌ Data quality analysis failed: {e}")

# Add method to analyzer class
LinkedInDataAnalyzer.analyze_data_quality_chunked = analyze_data_quality_chunked

# Run data quality analysis
analyzer.analyze_data_quality_chunked()

In [None]:
# Display data quality results
print("\n📊 Data Quality Analysis Results:")
quality_data = analyzer.insights['data_quality']

print(f"\nRows Analyzed: {quality_data.get('total_rows_analyzed', 0):,}")
print(f"Chunks Processed: {quality_data.get('chunks_processed', 0)}")

# Show field quality breakdown
completeness = quality_data.get('completeness_summary', {})
print(f"\n🟢 High Quality Fields ({len(completeness.get('high_quality_fields', []))}):")
for field in completeness.get('high_quality_fields', [])[:10]:
    null_pct = quality_data.get('null_percentages', {}).get(field, 0)
    print(f"  ✅ {field:<30} | {null_pct:5.1f}% null")

print(f"\n🟡 Medium Quality Fields ({len(completeness.get('medium_quality_fields', []))}):")
for field in completeness.get('medium_quality_fields', [])[:5]:
    null_pct = quality_data.get('null_percentages', {}).get(field, 0)
    print(f"  ⚠️  {field:<30} | {null_pct:5.1f}% null")

print(f"\n🔴 Low Quality Fields ({len(completeness.get('low_quality_fields', []))}):")
for field in completeness.get('low_quality_fields', [])[:5]:
    null_pct = quality_data.get('null_percentages', {}).get(field, 0)
    print(f"  ❌ {field:<30} | {null_pct:5.1f}% null")

In [None]:
def analyze_content_characteristics(self):
    """Analyze content characteristics for text fields"""
    print("\n🔍 Analyzing Content Characteristics...")
    
    # Text field patterns to look for
    text_columns = []
    text_stats = {}
    skills_analysis = {}
    
    chunk_count = 0
    
    try:
        parquet_file = pq.ParquetFile(self.parquet_file)
        
        # Sample first few chunks to identify text columns
        for batch in parquet_file.iter_batches(batch_size=self.chunk_size):
            chunk_df = batch.to_pandas()
            
            if chunk_count == 0:  # First chunk - identify text columns
                text_columns = [col for col in chunk_df.columns 
                              if chunk_df[col].dtype == 'object' or 'string' in str(chunk_df[col].dtype)]
                print(f"   Identified text columns: {text_columns[:10]}...")  # Show first 10
            
            # Analyze text characteristics
            for col in text_columns[:15]:  # Limit to first 15 text columns for performance
                if col not in text_stats:
                    text_stats[col] = {
                        'lengths': [],
                        'sample_values': [],
                        'unique_count': set()
                    }
                
                # Sample text lengths (every 10th row to save memory)
                sample_data = chunk_df[col].dropna().iloc[::10]
                if len(sample_data) > 0:
                    lengths = sample_data.astype(str).str.len()
                    text_stats[col]['lengths'].extend(lengths.tolist()[:100])  # Limit samples
                    
                    # Sample values for analysis
                    if len(text_stats[col]['sample_values']) < 50:
                        text_stats[col]['sample_values'].extend(
                            sample_data.tolist()[:10]
                        )
                
                # Skills analysis for skills-related columns
                if any(skill_keyword in col.lower() for skill_keyword in ['skill', 'expertise', 'competenc']):
                    if col not in skills_analysis:
                        skills_analysis[col] = []
                    
                    # Sample skills data
                    skills_sample = chunk_df[col].dropna().iloc[:20]
                    if len(skills_sample) > 0:
                        skills_analysis[col].extend(skills_sample.tolist())
            
            chunk_count += 1
            del chunk_df
            gc.collect()
            
            # Limit chunks for content analysis to save time
            if chunk_count >= 3:  # Analyze first 150k rows for content
                break
        
        # Process text statistics
        content_insights = {}
        for col, stats in text_stats.items():
            if stats['lengths']:
                lengths_array = np.array(stats['lengths'])
                content_insights[col] = {
                    'min_length': int(lengths_array.min()),
                    'max_length': int(lengths_array.max()),
                    'avg_length': round(lengths_array.mean(), 2),
                    'percentile_95': int(np.percentile(lengths_array, 95)),
                    'sample_values': stats['sample_values'][:3]  # Top 3 samples
                }
        
        self.insights['content_analysis'] = {
            'text_columns': text_columns,
            'content_stats': content_insights,
            'skills_analysis': skills_analysis,
            'chunks_analyzed': chunk_count
        }
        
        print(f"✅ Content Analysis Complete:")
        print(f"   - Text Columns: {len(text_columns)}")
        print(f"   - Chunks Analyzed: {chunk_count}")
        
    except Exception as e:
        print(f"❌ Content analysis failed: {e}")

# Add method to analyzer class
LinkedInDataAnalyzer.analyze_content_characteristics = analyze_content_characteristics

# Run content analysis
analyzer.analyze_content_characteristics()

In [None]:
# Display content analysis results
print("\n📝 Content Analysis Results:")
content_data = analyzer.insights['content_analysis']

print(f"Total Text Columns: {len(content_data.get('text_columns', []))}")
print(f"Analyzed Chunks: {content_data.get('chunks_analyzed', 0)}")

# Show content statistics for key fields
content_stats = content_data.get('content_stats', {})
print(f"\n📊 Text Field Length Analysis:")
for col_name, stats in list(content_stats.items())[:10]:  # Show first 10 fields
    print(f"  {col_name:<25} | Avg: {stats.get('avg_length', 0):6.1f} | Max: {stats.get('max_length', 0):5d} | 95%: {stats.get('percentile_95', 0):5d}")

# Show skills analysis if any found
skills_data = content_data.get('skills_analysis', {})
if skills_data:
    print(f"\n🎯 Skills Analysis:")
    for col_name, skills_list in skills_data.items():
        print(f"  {col_name}: {len(skills_list)} skill entries found")
        if skills_list:
            print(f"    Sample: {skills_list[0][:100]}...")  # First 100 chars of first skill

In [None]:
def generate_java_recommendations(self):
    """Generate Java model and configuration recommendations"""
    print("\n🔍 Generating Java Recommendations...")
    
    schema = self.insights.get('schema_analysis', {})
    quality = self.insights.get('data_quality', {})
    content = self.insights.get('content_analysis', {})
    
    # Generate Java field recommendations
    java_fields = {}
    for col_name, col_info in schema.get('columns', {}).items():
        field_name = self.to_camel_case(col_name)
        
        # Determine Java type and constraints
        if 'string' in col_info['type'].lower() or 'object' in col_info['type'].lower():
            # Get length recommendation from content analysis
            max_length = 255  # default
            if col_name in content.get('content_stats', {}):
                max_length = max(500, content['content_stats'][col_name].get('percentile_95', 255))
            
            java_fields[field_name] = {
                'original_column': col_name,
                'java_type': 'String',
                'jpa_annotation': f'@Column(name = "{col_name}", length = {max_length})',
                'nullable': col_info.get('nullable', True),
                'null_percentage': quality.get('null_percentages', {}).get(col_name, 0)
            }
        
        elif 'int' in col_info['type'].lower():
            java_fields[field_name] = {
                'original_column': col_name,
                'java_type': 'Integer',
                'jpa_annotation': f'@Column(name = "{col_name}")',
                'nullable': col_info.get('nullable', True),
                'null_percentage': quality.get('null_percentages', {}).get(col_name, 0)
            }
        
        elif 'bool' in col_info['type'].lower():
            java_fields[field_name] = {
                'original_column': col_name,
                'java_type': 'Boolean',
                'jpa_annotation': f'@Column(name = "{col_name}")',
                'nullable': col_info.get('nullable', True),
                'null_percentage': quality.get('null_percentages', {}).get(col_name, 0)
            }
    
    # Processing recommendations
    processing_config = {
        'recommended_batch_size': min(5000, max(1000, self.chunk_size // 10)),
        'memory_per_batch_mb': round(self.get_memory_usage() / 10, 2),
        'estimated_processing_time_hours': round((schema.get('total_rows', 0) / 10000) / 60, 2),
        'high_priority_fields': quality.get('completeness_summary', {}).get('high_quality_fields', []),
        'validation_required_fields': quality.get('completeness_summary', {}).get('low_quality_fields', [])
    }
    
    self.insights['processing_recommendations'] = {
        'java_fields': java_fields,
        'processing_config': processing_config
    }
    
    print(f"✅ Java Recommendations Generated:")
    print(f"   - Java Fields: {len(java_fields)}")
    print(f"   - Recommended Batch Size: {processing_config['recommended_batch_size']}")

def to_camel_case(self, snake_str):
    """Convert snake_case to camelCase"""
    components = snake_str.split('_')
    return components[0] + ''.join(word.capitalize() for word in components[1:])

# Add methods to analyzer class
LinkedInDataAnalyzer.generate_java_recommendations = generate_java_recommendations
LinkedInDataAnalyzer.to_camel_case = to_camel_case

# Run Java recommendations
analyzer.generate_java_recommendations()

In [None]:
def generate_database_schema(self):
    """Generate optimized database schema"""
    print("\n🔍 Generating Database Schema...")
    
    java_fields = self.insights.get('processing_recommendations', {}).get('java_fields', {})
    
    # Generate CREATE TABLE statement
    create_table = "CREATE TABLE profiles (\n"
    create_table += "    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),\n"
    
    for field_name, field_info in list(java_fields.items())[:30]:  # Limit to first 30 fields for demo
        col_name = field_info['original_column']
        java_type = field_info['java_type']
        null_pct = field_info['null_percentage']
        
        # Determine SQL type and constraints
        if java_type == 'String':
            length = field_info['jpa_annotation'].split('length = ')[1].split(')')[0] if 'length =' in field_info['jpa_annotation'] else '255'
            sql_type = f"VARCHAR({length})"
        elif java_type == 'Integer':
            sql_type = "INTEGER"
        elif java_type == 'Boolean':
            sql_type = "BOOLEAN"
        else:
            sql_type = "TEXT"
        
        # Add NOT NULL for high-quality fields
        nullable = "" if null_pct > 10 else " NOT NULL" if null_pct < 1 else ""
        
        create_table += f"    {col_name} {sql_type}{nullable},\n"
    
    # Add vector embedding column
    create_table += "    embedding vector(1536),\n"
    create_table += "    created_at TIMESTAMP DEFAULT NOW(),\n"
    create_table += "    updated_at TIMESTAMP DEFAULT NOW()\n"
    create_table += ");"
    
    # Generate indexes
    indexes = []
    indexes.append("CREATE INDEX CONCURRENTLY profiles_embedding_hnsw_idx ON profiles USING hnsw (embedding vector_cosine_ops);")
    
    # Add indexes for high-quality, commonly queried fields
    high_quality_fields = self.insights.get('data_quality', {}).get('completeness_summary', {}).get('high_quality_fields', [])
    for field in high_quality_fields[:5]:  # Top 5 fields
        if field != 'id':
            indexes.append(f"CREATE INDEX CONCURRENTLY idx_profiles_{field} ON profiles({field});")
    
    self.insights['database_schema'] = {
        'create_table_sql': create_table,
        'indexes_sql': indexes
    }
    
    print(f"✅ Database Schema Generated")

def save_insights(self):
    """Save all insights to files"""
    print("\n💾 Saving Analysis Results...")
    
    # Save JSON insights
    insights_file = os.path.join(OUTPUT_DIR, 'linkedin_analysis_insights.json')
    with open(insights_file, 'w') as f:
        json.dump(self.insights, f, indent=2, default=str)
    
    # Save database schema
    schema_file = os.path.join(OUTPUT_DIR, 'optimized_schema.sql')
    with open(schema_file, 'w') as f:
        f.write(self.insights.get('database_schema', {}).get('create_table_sql', ''))
        f.write('\n\n-- Indexes\n')
        for index in self.insights.get('database_schema', {}).get('indexes_sql', []):
            f.write(index + '\n')
    
    print(f"✅ Results Saved to: {OUTPUT_DIR}")
    print(f"   - Insights: linkedin_analysis_insights.json")
    print(f"   - Database Schema: optimized_schema.sql")
    
    return insights_file, schema_file

# Add methods to analyzer class
LinkedInDataAnalyzer.generate_database_schema = generate_database_schema
LinkedInDataAnalyzer.save_insights = save_insights

# Run database schema generation and save results
analyzer.generate_database_schema()
insights_file, schema_file = analyzer.save_insights()

In [None]:
# Display final analysis summary
print("\n" + "="*80)
print("🎉 LINKEDIN DATASET ANALYSIS COMPLETE")
print("="*80)

# Show key insights
schema_info = analyzer.insights['schema_analysis']
quality_info = analyzer.insights['data_quality']
processing_info = analyzer.insights['processing_recommendations']['processing_config']

print(f"\n📊 DATASET OVERVIEW:")
print(f"   📁 File Size: {schema_info.get('file_size_gb', 0)} GB")
print(f"   📋 Total Rows: {schema_info.get('total_rows', 0):,}")
print(f"   🗂️  Total Columns: {schema_info.get('total_columns', 0)}")
print(f"   🔍 Rows Analyzed: {quality_info.get('total_rows_analyzed', 0):,}")

print(f"\n🎯 DATA QUALITY SUMMARY:")
completeness = quality_info.get('completeness_summary', {})
print(f"   🟢 High Quality Fields: {len(completeness.get('high_quality_fields', []))}")
print(f"   🟡 Medium Quality Fields: {len(completeness.get('medium_quality_fields', []))}")
print(f"   🔴 Low Quality Fields: {len(completeness.get('low_quality_fields', []))}")

print(f"\n⚙️ PROCESSING RECOMMENDATIONS:")
print(f"   📦 Recommended Batch Size: {processing_info.get('recommended_batch_size', 0):,}")
print(f"   💾 Memory per Batch: {processing_info.get('memory_per_batch_mb', 0):.1f} MB")
print(f"   ⏱️  Est. Processing Time: {processing_info.get('estimated_processing_time_hours', 0):.1f} hours")

print(f"\n📁 OUTPUT FILES GENERATED:")
print(f"   📋 Analysis Report: {insights_file}")
print(f"   🗃️  Database Schema: {schema_file}")

print(f"\n🚀 NEXT STEPS:")
print("   1. Review the generated analysis files")
print("   2. Use the database schema for your PostgreSQL setup")
print("   3. Apply the processing recommendations to your Java application")
print("   4. Use the identified high-quality fields for core functionality")

print("\n" + "="*80)